こんにちは。
コンちゃんこと佐々木です。
ワケあってRedisから大量のデータを持ってきてファイルに書き出すpythonのプログラムを書きました。
が、サイズが大きすぎてRedisとのコネクションが切れるというエラーと、ファイル書き込み時のメモリエラーが起きました。
データを分割して取得することと、ファイル書き込みも分割して行うことによって解決しましたので、詳細を書き残そうと思います。
まず、こんなコードを書きました。
def write_file(): """ Redisから取得したデータをファイルに書き出す """ # Redisから値を取得 data = get_data(REDIS_KEY) # ファイルに書き込む file_path = 'hoge/fuga/piyo.txt' with open(file_path, 'W') as f: f.write('\n'.join(data)) def get_data(key): """ keyで指定したキーの要素を全部Redisから取得する """ # 当該keyの要素数を取得 length = redis_session.llen(key) # 先頭から要素数分の値を取得 data = redis_session.lrange(key, 0, length - 1) # 先頭から要素数分の値を削除 redis_session.ltrim(key, length, -1) return data
これを、要素数が数百万のRedisに対して実行すると、以下の例外が出ました。
$ (実行コマンド) Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 185, in _read_from_socket raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) OSError: Connection closed by server. During handling of the above exception, another exception occurred: Traceback (most recent call last): (中略) File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 1625, in lrange return self.execute_command('LRANGE', name, start, end) File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 775, in execute_command return self.parse_response(connection, command_name, **options) File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 789, in parse_response response = connection.read_response() File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 637, in read_response response = self._parser.read_response() File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 290, in read_response response = self._buffer.readline() File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 224, in readline self._read_from_socket() File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 198, in _read_from_socket raise ConnectionError("Error while reading from socket: %s" % redis.exceptions.ConnectionError: Error while reading from socket: ('Connection closed by server.',)
接続切れちゃってそう。
なぜ接続が切れたか探るため、redis-pyのコードを読む旅に出かけました。
で、ソケットからデータを読み込むところでエラーとなったようです。
付近のコードを読むに、変数 data
に値が入らなかったために起きていて、 data
は data = self._sock.recv(socket_read_size)
となっています。
pythonのrecv()のドキュメントを読むと、引数で指定した分を最大としてデータを取得するみたいで、今回の引数の socket_read_size
は socket_read_size=65536
との記述がコード中に見られます。
で、結論、今回は読み込むデータ量がかなり大きいので socket_read_size
をオーバーしてうまく読み込めなかったのでしょう。
これ以上の深追いはシステムコールのrecvにまで到達しそうなので、目途が付いたこの辺でストップ。
さて、おおかた原因予想がついたところで、以下のように10000個ずつ分割して読み込むことにしました。
def get_data(key): """ keyで指定したキーの要素を全部Redisから取得する """ # 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する remaining_length = redis_session.llen(key) offset = 10000 values = list() # offset単位で先頭から順番に取りだす while remaining_length > 0: # 取得する要素数を求める get_length = min(offset, remaining_length) # 先頭から要素数分の値を取得 value = redis_session.lrange(key, 0, get_length - 1) # 値を返却値リストに入れる values.extend(value) # 先頭から要素数分の値を削除 redis_session.ltrim(key, get_length, -1) # 残り要素数を更新 remaining_length -= get_length return values
これで大丈夫でしょう!と実行しました。
Traceback (most recent call last): (中略) f.write('\n'.join(data)) MemoryError
_人人人人人人人人_
> MemoryError <
 ̄Y^Y^Y^Y^Y^Y ̄
今度は、Redisからの読み込み後の、ファイル書き込み部分。
メモリ使用量を監視しつつ再実行すると、どんどんメモリを食べて、数GBまで上がっていきました。
対処方針としては、Redisから全部取ってきてからファイルに書き込むのではなく、Redisから取得次第書き込むようにしたいです。
ここで yield
句が役に立ちました。yield
を簡単に言うと、「ループで都度returnしてくれる」というものです。
Pythonのみならず、C#等にも yield return
があります。
Pythonでは、おおよそ以下の手順で使います。
- 何回もreturnしたいメソッドの
return
をyield
に書き換える - 呼び出し元メソッドで、ループ等で何回も呼び出してあげる
ループが回っている途中状態を裏で持ってくれていますので、ループ途中return的なことができます。yield
を使うように書き換えてみました。
def write_file(): """ Redisから取得したデータをファイルに書き出す """ file_path = 'hoge/fuga/piyo.txt' with gzip.open(file_path, 'at') as f: # Redisから値を取得しつつファイルに書き込む for data in get_data(REDISKEY): f.write('\n'.join(data)) f.write('\n') def get_data(key): """ keyで指定したキーの要素を全部Redisから取得する """ # 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する remaining_length = redis_session.llen(key) offset = 10000 # offset単位で先頭から順番に取りだす while remaining_length > 0: # 取得する要素数を求める get_length = min(offset, remaining_length) # 先頭から要素数分の値を取得 value = redis_session.lrange(key, 0, get_length - 1) # 先頭から要素数分の値を削除 redis_session.ltrim(key, get_length, -1) # 残り要素数を更新 remaining_length -= get_length # 値を返す yield value
forループで get_data()
が実行されています。get_data()
では yield
を含むwhileループがありますが、呼び出されるごとにまっさらな状態からスタートするわけではなく、前回のループの状態を生かしてくれます。
本例ですと、変数 remaining_length
の値がループの度に減少しますが、この変数の値を記憶してくれています。
これで、Redisから取得次第ファイルに追記するようになりましたので、メモリにサイズの大きい一時データを展開する必要がなくなり、メモリ使用量がぐんと抑えられました。
以上、戦いの記録でした。
実際はステージング環境での確認や、再現用にRedisに意図的に数百万のデータを入れるスクリプトの用意など、諸々やっていますが省略しました。
それでは。