KoNLPy를 좀 많이, 오래 돌려야 하는 일이 있어서 처음에 람다를 생각해보다가, 이 패키지가 래핑해놓은 형태소 분석기들이 자바 기반이라서 패키징하기가 까다로워서 패스를 했다. 아예 자바로 람다 함수를 만드는 방법도 있지만 링크 자바랑 친하지 않아서 다른 방법을 찾기로 했다. Pool 같은 것을 쓸까 하다가 조금 더 쉽게 갈 수 있는 방법이 있을 것 같아서 Celery를 써보기로 했다.
도커가 깔려있다고 가정하고 일단 레디스를 띄운다. 어차피 오래 쓸 것이 아니라서 데몬으로 띄우지 않았다.
docker run --name redis-for-celery -i --rm -p 6379:6379 redis
Celery랑 Flower를 설치하고, redis 백엔드를 사용할거니까 레디스 라이브러리도 깔아준다.
pip install celery flower redis
함수에 데코레이터를 붙이면 끝. 램을 아껴야 해서 결과를 레디스 대신 파일에 저장하고 (참고: File system result backend), 트위터 tokenizer는 변수 공유가 안되서 함수 안에서 정의했다.
# tokenizer.py
from celery import Celery
from konlpy.tag import Twitter
import json
app = Celery('tokenizer', broker="redis://localhost", backend="file:///./tmp")
@app.task
def tokenize(line):
t = Twitter()
tokens = Twitter().nouns(line)
return tokens
이제 여기에다가 일을 시킬 스크립트를 작성한다. 결과를 불러올 때 저렇게 순서대로 불러오는게 약간 걸리지만 (태스크가 enque된 순서대로 끝나는 것이 아니므로 앞에 태스크가 안 끝나면 다 블록 됨) 전체적인 작업 시간에는 큰 영향을 미치지 않으니 딱히 문제가 되지는 않는다. 물론 Task가 절대 Fail할 일이 없을 경우의 이야기.
# master.py
import json
from tokenizer import tokenize
from tqdm import tqdm
lines = [...]
tokens_async = [tokenize.delay(line) for line in tqdm(lines)] # Queue에 payload를 넣음. 그 결과 AsyncResult를 얻음
tokens = []
for t in tqdm(tokens_async, total=len(lines)):
token = t.wait() # 작업의 결과를 불러옴
t.forget() # 임시로 저장된 각 작업의 결과를 삭제
tokens.append(token)
워커를 띄우고 모니터링을 위해 Flower도 띄운다.
celery -A tokenizer worker
celery -A tokenizer flower --broker=redis://localhost
python master.py