redis-pyのコードを読んだりyieldを使ったりした時の話

こんにちは。
コンちゃんこと佐々木です。
ワケあってRedisから大量のデータを持ってきてファイルに書き出すpythonのプログラムを書きました。
が、サイズが大きすぎてRedisとのコネクションが切れるというエラーと、ファイル書き込み時のメモリエラーが起きました。
データを分割して取得することと、ファイル書き込みも分割して行うことによって解決しましたので、詳細を書き残そうと思います。


まず、こんなコードを書きました。

def write_file():
    """
    Redisから取得したデータをファイルに書き出す
    """
    # Redisから値を取得
    data = get_data(REDIS_KEY)
    # ファイルに書き込む
    file_path = 'hoge/fuga/piyo.txt'
    with open(file_path, 'W') as f:
        f.write('\n'.join(data))

def get_data(key):
    """
    keyで指定したキーの要素を全部Redisから取得する
    """
    # 当該keyの要素数を取得
    length = redis_session.llen(key)
    # 先頭から要素数分の値を取得
    data = redis_session.lrange(key, 0, length - 1)
    # 先頭から要素数分の値を削除
    redis_session.ltrim(key, length, -1)
    return data

これを、要素数が数百万のRedisに対して実行すると、以下の例外が出ました。

$ (実行コマンド)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 185, in _read_from_socket
    raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
OSError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(中略)
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 1625, in lrange
    return self.execute_command('LRANGE', name, start, end)
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 775, in execute_command
    return self.parse_response(connection, command_name, **options)
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 789, in parse_response
    response = connection.read_response()
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 637, in read_response
    response = self._parser.read_response()
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 290, in read_response
    response = self._buffer.readline()
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 224, in readline
    self._read_from_socket()
  File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 198, in _read_from_socket
    raise ConnectionError("Error while reading from socket: %s" %
redis.exceptions.ConnectionError: Error while reading from socket: ('Connection closed by server.',)

接続切れちゃってそう。
なぜ接続が切れたか探るため、redis-pyのコードを読む旅に出かけました。

で、ソケットからデータを読み込むところでエラーとなったようです。
付近のコードを読むに、変数 data に値が入らなかったために起きていて、 data は data = self._sock.recv(socket_read_size) となっています。
pythonのrecv()のドキュメントを読むと、引数で指定した分を最大としてデータを取得するみたいで、今回の引数の socket_read_size は socket_read_size=65536 との記述がコード中に見られます。
で、結論、今回は読み込むデータ量がかなり大きいので socket_read_size をオーバーしてうまく読み込めなかったのでしょう。
これ以上の深追いはシステムコールのrecvにまで到達しそうなので、目途が付いたこの辺でストップ。

さて、おおかた原因予想がついたところで、以下のように10000個ずつ分割して読み込むことにしました。

def get_data(key):
    """
    keyで指定したキーの要素を全部Redisから取得する
    """
    # 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する
    remaining_length = redis_session.llen(key)
    offset = 10000
    values = list()

    # offset単位で先頭から順番に取りだす
    while remaining_length > 0:
        # 取得する要素数を求める
        get_length = min(offset, remaining_length)
        # 先頭から要素数分の値を取得
        value = redis_session.lrange(key, 0, get_length - 1)
        # 値を返却値リストに入れる
        values.extend(value)
        # 先頭から要素数分の値を削除
        redis_session.ltrim(key, get_length, -1)
        # 残り要素数を更新
        remaining_length -= get_length

    return values

これで大丈夫でしょう!と実行しました。

Traceback (most recent call last):
  (中略)
    f.write('\n'.join(data))
MemoryError

_人人人人人人人人_
> MemoryError <
 ̄Y^Y^Y^Y^Y^Y ̄

今度は、Redisからの読み込み後の、ファイル書き込み部分。
メモリ使用量を監視しつつ再実行すると、どんどんメモリを食べて、数GBまで上がっていきました。
対処方針としては、Redisから全部取ってきてからファイルに書き込むのではなく、Redisから取得次第書き込むようにしたいです。

ここで yield 句が役に立ちました。
yield を簡単に言うと、「ループで都度returnしてくれる」というものです。
Pythonのみならず、C#等にも yield return があります。
Pythonでは、おおよそ以下の手順で使います。

  1. 何回もreturnしたいメソッドの return を yield に書き換える
  2. 呼び出し元メソッドで、ループ等で何回も呼び出してあげる

ループが回っている途中状態を裏で持ってくれていますので、ループ途中return的なことができます。
yield を使うように書き換えてみました。

def write_file():
    """
    Redisから取得したデータをファイルに書き出す
    """
    file_path = 'hoge/fuga/piyo.txt'
    with gzip.open(file_path, 'at') as f:
        # Redisから値を取得しつつファイルに書き込む
        for data in get_data(REDISKEY):
            f.write('\n'.join(data))
            f.write('\n')

def get_data(key):
    """
    keyで指定したキーの要素を全部Redisから取得する
    """
    # 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する
    remaining_length = redis_session.llen(key)
    offset = 10000

    # offset単位で先頭から順番に取りだす
    while remaining_length > 0:
        # 取得する要素数を求める
        get_length = min(offset, remaining_length)
        # 先頭から要素数分の値を取得
        value = redis_session.lrange(key, 0, get_length - 1)
        # 先頭から要素数分の値を削除
        redis_session.ltrim(key, get_length, -1)
        # 残り要素数を更新
        remaining_length -= get_length
        # 値を返す
        yield value

forループで get_data() が実行されています。
get_data() では yield を含むwhileループがありますが、呼び出されるごとにまっさらな状態からスタートするわけではなく、前回のループの状態を生かしてくれます。
本例ですと、変数 remaining_length の値がループの度に減少しますが、この変数の値を記憶してくれています。

これで、Redisから取得次第ファイルに追記するようになりましたので、メモリにサイズの大きい一時データを展開する必要がなくなり、メモリ使用量がぐんと抑えられました。

以上、戦いの記録でした。
実際はステージング環境での確認や、再現用にRedisに意図的に数百万のデータを入れるスクリプトの用意など、諸々やっていますが省略しました。
それでは。

コメントを残す

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)