こんにちは。
コンちゃんこと佐々木です。
ワケあってRedisから大量のデータを持ってきてファイルに書き出すpythonのプログラムを書きました。
が、サイズが大きすぎてRedisとのコネクションが切れるというエラーと、ファイル書き込み時のメモリエラーが起きました。
データを分割して取得することと、ファイル書き込みも分割して行うことによって解決しましたので、詳細を書き残そうと思います。
まず、こんなコードを書きました。
def write_file():
"""
Redisから取得したデータをファイルに書き出す
"""
# Redisから値を取得
data = get_data(REDIS_KEY)
# ファイルに書き込む
file_path = 'hoge/fuga/piyo.txt'
with open(file_path, 'W') as f:
f.write('\n'.join(data))
def get_data(key):
"""
keyで指定したキーの要素を全部Redisから取得する
"""
# 当該keyの要素数を取得
length = redis_session.llen(key)
# 先頭から要素数分の値を取得
data = redis_session.lrange(key, 0, length - 1)
# 先頭から要素数分の値を削除
redis_session.ltrim(key, length, -1)
return data
これを、要素数が数百万のRedisに対して実行すると、以下の例外が出ました。
$ (実行コマンド)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 185, in _read_from_socket
raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
OSError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(中略)
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 1625, in lrange
return self.execute_command('LRANGE', name, start, end)
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 775, in execute_command
return self.parse_response(connection, command_name, **options)
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/client.py", line 789, in parse_response
response = connection.read_response()
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 637, in read_response
response = self._parser.read_response()
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 290, in read_response
response = self._buffer.readline()
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 224, in readline
self._read_from_socket()
File "/usr/local/lib/python3.8/site-packages/redis-3.2.1-py3.8.egg/redis/connection.py", line 198, in _read_from_socket
raise ConnectionError("Error while reading from socket: %s" %
redis.exceptions.ConnectionError: Error while reading from socket: ('Connection closed by server.',)
接続切れちゃってそう。
なぜ接続が切れたか探るため、redis-pyのコードを読む旅に出かけました。
で、ソケットからデータを読み込むところでエラーとなったようです。
付近のコードを読むに、変数 data に値が入らなかったために起きていて、 data は data = self._sock.recv(socket_read_size) となっています。
pythonのrecv()のドキュメントを読むと、引数で指定した分を最大としてデータを取得するみたいで、今回の引数の socket_read_size は socket_read_size=65536 との記述がコード中に見られます。
で、結論、今回は読み込むデータ量がかなり大きいので socket_read_size をオーバーしてうまく読み込めなかったのでしょう。
これ以上の深追いはシステムコールのrecvにまで到達しそうなので、目途が付いたこの辺でストップ。
さて、おおかた原因予想がついたところで、以下のように10000個ずつ分割して読み込むことにしました。
def get_data(key):
"""
keyで指定したキーの要素を全部Redisから取得する
"""
# 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する
remaining_length = redis_session.llen(key)
offset = 10000
values = list()
# offset単位で先頭から順番に取りだす
while remaining_length > 0:
# 取得する要素数を求める
get_length = min(offset, remaining_length)
# 先頭から要素数分の値を取得
value = redis_session.lrange(key, 0, get_length - 1)
# 値を返却値リストに入れる
values.extend(value)
# 先頭から要素数分の値を削除
redis_session.ltrim(key, get_length, -1)
# 残り要素数を更新
remaining_length -= get_length
return values
これで大丈夫でしょう!と実行しました。
Traceback (most recent call last):
(中略)
f.write('\n'.join(data))
MemoryError
_人人人人人人人人_
> MemoryError <
 ̄Y^Y^Y^Y^Y^Y ̄
今度は、Redisからの読み込み後の、ファイル書き込み部分。
メモリ使用量を監視しつつ再実行すると、どんどんメモリを食べて、数GBまで上がっていきました。
対処方針としては、Redisから全部取ってきてからファイルに書き込むのではなく、Redisから取得次第書き込むようにしたいです。
ここで yield 句が役に立ちました。yield を簡単に言うと、「ループで都度returnしてくれる」というものです。
Pythonのみならず、C#等にも yield return があります。
Pythonでは、おおよそ以下の手順で使います。
- 何回もreturnしたいメソッドの
returnをyieldに書き換える - 呼び出し元メソッドで、ループ等で何回も呼び出してあげる
ループが回っている途中状態を裏で持ってくれていますので、ループ途中return的なことができます。yield を使うように書き換えてみました。
def write_file():
"""
Redisから取得したデータをファイルに書き出す
"""
file_path = 'hoge/fuga/piyo.txt'
with gzip.open(file_path, 'at') as f:
# Redisから値を取得しつつファイルに書き込む
for data in get_data(REDISKEY):
f.write('\n'.join(data))
f.write('\n')
def get_data(key):
"""
keyで指定したキーの要素を全部Redisから取得する
"""
# 処理中に追加データがpopされても良いように、処理開始時点の長さを取り、この長さ分だけ取得する
remaining_length = redis_session.llen(key)
offset = 10000
# offset単位で先頭から順番に取りだす
while remaining_length > 0:
# 取得する要素数を求める
get_length = min(offset, remaining_length)
# 先頭から要素数分の値を取得
value = redis_session.lrange(key, 0, get_length - 1)
# 先頭から要素数分の値を削除
redis_session.ltrim(key, get_length, -1)
# 残り要素数を更新
remaining_length -= get_length
# 値を返す
yield value
forループで get_data() が実行されています。get_data() では yield を含むwhileループがありますが、呼び出されるごとにまっさらな状態からスタートするわけではなく、前回のループの状態を生かしてくれます。
本例ですと、変数 remaining_length の値がループの度に減少しますが、この変数の値を記憶してくれています。
これで、Redisから取得次第ファイルに追記するようになりましたので、メモリにサイズの大きい一時データを展開する必要がなくなり、メモリ使用量がぐんと抑えられました。
以上、戦いの記録でした。
実際はステージング環境での確認や、再現用にRedisに意図的に数百万のデータを入れるスクリプトの用意など、諸々やっていますが省略しました。
それでは。