SMARTCAMP Engineer Blog

スマートキャンプ株式会社(SMARTCAMP Co., Ltd.)のエンジニアブログです。業務で取り入れた新しい技術や試行錯誤を知見として共有していきます。

BERTとSageMakerによる検索アルゴリズムの実装とデプロイ例の紹介

概要

スマートキャンプでエンジニアをしている佐々木です。

本記事では、自然言語処理モデルを用いて新規サービスを作れないか試行錯誤した話をしようと思います。

今回は精度の良い検索はうまく実装できませんでしたが、機械学習モデルをインフラで動かす流れは学ぶことができました。

実際に実装したコード例ともに紹介します。

経緯

スマートキャンプでは毎年プロダクトチーム合宿をしています。(前回の様子

その機会を活用し、普段の業務ではあまり触れない技術の探索をしました。

その中で、自然言語処理モデルを用いた検索システムの開発を試みた話をします。

なぜ、このシステム開発をすることにしたのか、理由は2つあります。

  1. 最新の自然言語処理(NLP)の動向にキャッチアップするきっかけが欲しかった
  2. 個人で実験するには、学習にお金がかかるので試行錯誤しにくい

1について、まず、個人的に学習したい分野であったことが大きいです。また、実務でも利用できる段階にあるという話は聞いていたので、会社としてもNLP周りの知見があれば使い所はありそうだと感じました。

2について、MLを行なううえでネックになるのが大体計算リソースとデータ量ですが、その片方のコストがこの機会なら浮くと思ったことが理由です(膨大なコストがかかるのはNGですが、ある程度はOKです)。

検索の仕組み

通常、検索エンジンを構築するには次のような手順を踏む必要があります。

  1. インデックスを作成する: Webサイトから文章を抽出し、検索用のインデックスを作成する
  2. クローラを作成する: インターネット上のさまざまなWebサイトを巡回し、新しいコンテンツを収集するプログラムを作成する
  3. ランキングアルゴリズムを実装する: インデックスを元に、検索クエリに対して適切な結果を返すためのアルゴリズムを実装する

今回は、1の検索用のインデックスに関しては、slackなどのチャットツールやKibelaなどの社内ドキュメントツールから作成しました。

また、2のクローラは作成せず、定期的にAPIを介して定期的に検索対象の文章を取得するバッチ処理を作成しました。

今回、主に話したいのは3のランキングアルゴリズムについてです。

検索クエリと検索対象の文章の類似度を計算することで、検索クエリに対する適切な結果を含む文章を探し出すことができます。

そして、その2文章間の類似度計算の前段階としてSentenceBERTの日本語事前学習モデルを用いました。

ここについて次から詳しく説明します。

MLモデルのトレンド

自然言語処理(NLP)の分野では、BERTやGPTといったTransformerモデルがここ数年のトレンドとなっています。

先々月にはOpenAIがChatGPTを発表し、弊社でもよくこういう会話をしてみたという投稿が盛んに行われていました。(私は現在でもこちらを活用しています)

こちらもTransformerモデルであるGPTを改良したものであることが知られています。

このようなモデルを1から作成するには、膨大なデータの用意と豊富な計算資源と運用費が必要になるため、資金力に余裕がある一握りの企業や大学でしか行なうことができません。

しかし、それらの組織が作成し公開してくれた事前学習モデルをfine-tuningすることによって、個別のタスクに応用し、資源の少ない組織でも扱うことができます。

採用した文章の類似度計算のアルゴリズム

その中でもSentenceBERTを用いることが文章の類似度を計算するのに適しています。

SentenceBERTは、文章を入力として受け取り、その文章を特徴量ベクトルに変換するTransformerモデルです。これは先のBERTと呼ばれる大規模な言語モデルを拡張して作られており、文章のsemantics(意味的な情報)を正確に捉えることができます。

このように文章を「意味的な情報を含んだ特徴量ベクトル」に変換するというのが肝で、このベクトルがどのくらい似ているか調べることにより文章の類似度計算ができます。それ以外にも文章のタグ付けや、文章の分類なども行えます。

よく使われる2つのベクトル間の類似度計算にコサイン類似度というものがあります。

2つのベクトルが完全一致する場合、1になり、完全に異なる場合に-1になります。

具体的には、次の式を使います。

 similarity = \frac{v_1 \cdot v_2}{\left|v_1\right| \cdot \left|v_2\right|}

ここで、 v_1v_2は2つのベクトルを表します。\cdotは内積を表す記号です。\left|v_1\right|は、ベクトルv_1の大きさを表します。\left|v_2\right|も同様です。

すなわち類似度計算の流れは次のようになります。

  1. 検索対象の文章を「意味的な情報を含んだ特徴量ベクトル」に変換する
  2. 検索クエリを「意味的な情報を含んだ特徴量ベクトル」に変換する
  3. 2つのベクトルの類似度を計算する
  4. それらを類似度が高いもの順に並べる

こちらを元に、検索におけるランキングアルゴリズムを実装し、デプロイしてみました。

次にデプロイについて話します。

類似度計算モデルのデプロイ

デプロイには、Amazon SageMakerを用いました。Amazon SageMakerはAmazon Web Services(AWS)が提供する機械学習を手軽に行えるサービスです。機械学習では統計的な分析や予測モデルの構築、訓練、デプロイなどさまざまな作業が必要になりますが、それらをすべて管理できます。

Amazon SageMakerにはユースケースに応じていくつかに機能が分かれています。例えば、ビジネスアナリスト用のSageMaker Canvasというノーコードインターフェースや、機械学習エンジニア用のSageMaker MLOpsといった機能があります。その中で、今回はAmazon SageMaker Studioというデータサイエンティスト向けのIDEを活用しました。

こちらを用いることで、データの前処理からGBDT系のモデルやPyTorchやTensorFlowを用いたディープラーニング系のモデルの開発からデプロイまで一気通貫して行えます。

今回はその中でも次の機能を利用しました。

  1. Notebookインスタンス
    • Jupyter NotebookやJupyter Labが活用できます。
  2. エンドポイント
    • エンドポイント用のMLインスタンスです。作成したMLモデルをこちらへデプロイしAPIとして推論結果を取得できます。

1について、データ前処理や、モデルの保存や、推論用のインスタンスを作成するのに用いました。こちらからAmazon S3バケットを作成したり、作成したMLエンドポイントへのアクセスのデモを行なうこともできます。

2について、今回は、推論結果として、文章の類似度ランキング情報を取得できるようにしました。(※あとで詳しく紹介しますが、検索として、MLモデルの中で類似度計算をするのは速度の面で好ましくありません。)

また、エンドポイント機能には、サーバーレスモードと常駐モードがあります。サーバーレスモードでは、エンドポイントへのアクセスに応じて推論用インスタンスが立てられるため、料金を安く抑えることができます。そのため、開発段階では、サーバーレスモードを活用し、デモとして行なう際には常駐インスタンスを用いました。

このように、SageMakerではデータの前処理、可視化などを行なうインスタンスと、学習インスタンス、そして、推論をするインスタンスの3つを必要に応じて扱えるようになっています。

すなわち、「学習の際は、GPUを積んだ高性能なインスタンスを短時間使用し、推論の際は、サーバーレス、もしくは低予算のインスタンスを起動させ、開発用のノートブックインスタンスは必要なときのみ起動する」のように使い分けられます。

用いた構成例

実際のコードの紹介

モデルの取得とデプロイ

実際には、学習を挟む必要がありますが、デプロイを行なうだけなら次のようにできます。

必要なライブラリのimport

# import libraries
import boto3, re, sys, math, json, os, sagemaker, urllib.request
from sagemaker import get_execution_role
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import Image
from IPython.display import display
from tqdm import tqdm
!pip install transformers[torch] sentencepiece
!pip install fugashi ipadic
from transformers import BertJapaneseTokenizer, BertModel
import torch

事前学習済みモデルの取得

MODEL_NAME = "sonoisa/sentence-bert-base-ja-mean-tokens-v2"

今回使用させていただいたモデルはこちらで詳しく紹介されています。

tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
model = BertModel.from_pretrained(MODEL_NAME)

モデルの保存とS3へのアップロード

SAVED_MODEL_DIR = 'transformer'
os.makedirs(SAVED_MODEL_DIR, exist_ok=True)
tokenizer.save_pretrained(SAVED_MODEL_DIR)
model.save_pretrained(SAVED_MODEL_DIR)
#zip the model in tar.gz format
!cd transformer && tar czvf ../model.tar.gz *
sagemaker_session = sagemaker.Session()
role = get_execution_role()
#Upload the model to S3
inputs = sagemaker_session.upload_data(path='model.tar.gz', key_prefix='model')
inputs

※ このinputsの値にはS3上のモデルへのパスが入っています[1]。

エンドポイントへのモデルのデプロイ

from sagemaker.pytorch import PyTorch, PyTorchModel
from sagemaker.predictor import Predictor
class StringPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super(StringPredictor, self).__init__(endpoint_name, sagemaker_session, content_type='text/plain')

こちらは、エンドポイントへの入力を単一の文字列にしたかったため作成しました。デフォルトではエンドポイントへ渡すデータは、特徴量が入った配列です。

pytorch_model = PyTorchModel(
    model_data = 's3://xxxxx/model/model.tar.gz',
    role=role,
    entry_point ='inference.py',
    source_dir = './code',
    py_version = 'py3',
    framework_version = '1.7.1',
    predictor_cls=StringPredictor
)

model_dataへは[1]の値を入れてください。 ※ inference.pyはデプロイされたエンドポイントへのアクセス時に実行される関数が集まったファイルです。のちに紹介します[2]。

通常のインスタンスの場合

インスタンスタイプはこちらから適宜必要なものを選択してください。 アクセスが多いと自動でスケールしてくれるようです。

predictor = pytorch_model.deploy(
    instance_type='xxx',
    initial_instance_count=1,
    endpoint_name = 'ml-endpoint',
)

注意点として、デプロイに応じて最低1時間分の料金が請求されるので注意してください。(高額インスタンスで無闇矢鱈にデプロイして削除するを繰り返すと、実際の稼働時間が少なくてもたくさんの請求がきます。)

なので、開発時は次のサーバーレスでのデプロイが特におすすめです。

サーバーレスの場合
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=5120,
    max_concurrency=5
)
predictor = pytorch_model.deploy(
    serverless_inference_config=serverless_config,
    initial_instance_count=1,
    endpoint_name = 'ml-endpoint'
)

エンドポイントの動作確認

実際にはLambdaからエンドポイントを叩いてレスポンスを取得していますが、ノートブックインスタンスから同様にエンドポイントを叩く際のコードを紹介します。

import json
import boto3
client = boto3.client('sagemaker-runtime')
ENDPOINT_NAME = 'ml-endpoint'

payload = "検索Query"
response = client.invoke_endpoint(
    EndpointName=ENDPOINT_NAME,
    ContentType='text/plain',
    Accept='application/json',
    Body=payload
)

result = json.loads(response['Body'].read().decode())
print(result)
[{'articles': [{'score': 0.5378279958997847,
    'note_id': 'xxxxxxxxxxx',
    'title': '記事のタイトル',
    'text': '記事本文',
    'timestamp': '2022-04-15T09:45:35.633Z'},...],
  'score_sum': 1.6076672689827232,
  'profile': {'username': '執筆者の名前',
   'dm_url': 'slackのDMのURL',
   'role': '役職',
   'division': '部署'},
  'slack_id': 'xxxxxxxxx',
  'score_ave': 0.5358890896609078},...
]

※ 実は今回は単純な検索ではなくて、関連する記事をたくさん書いている人をレコメンドするシステムにしているので、1レコードに対して複数の記事が紐づくような構成になっています。

エンドポイントでの推論処理

エンドポイントへのモデルのデプロイの[2]で触れたinference.py について説明します。

こちらのファイルでは少なくとも次の関数が必要です。

  • model_fn(model_dir)—モデルを Amazon SageMaker PyTorch モデルサーバーに読み込みます。
  • input_fn(request_body,request_content_type)—データを推論のオブジェクトに逆シリアル化します。
  • predict_fn(input_object,model)—逆シリアル化されたデータは、読み込まれたモデルに対して推論を実行します。
  • output_fn(prediction,content_type)—応答コンテンツタイプに従って推論をシリアル化します。

引用元: Amazon SageMaker を使用した fastai モデルの構築、トレーニング、およびデプロイ

実装例を紹介します。

ライブラリのimport

import argparse
import logging
import sagemaker_containers
import requests
import boto3


import os
import io
import time
from shutil import unpack_archive
from collections import defaultdict
import simplejson as json

from scipy.spatial import distance
import pandas as pd
from tqdm import tqdm
import torch
from transformers import BertJapaneseTokenizer, BertModel

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

model_fn

def model_fn(model_dir):
    logger.info('START model_fn')

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokenizer = BertJapaneseTokenizer.from_pretrained(model_dir)
    nlp_model = BertModel.from_pretrained(model_dir)
    nlp_model.eval()
    nlp_model.to(device)
    model = {'model': nlp_model, 'tokenizer': tokenizer}
    logger.info('END   model_fn')
    return model

input_fn

def input_fn(request_body, content_type='text/plain'):
    logger.info('START input_fn')

    try:
        data = [request_body.decode('utf-8')]
        return data
    except:
        raise Exception(
            'Requested unsupported ContentType in content_type: {}'.format(content_type))

predict_fn

こちらのpredict_fn内でやっていることだけざっくり紹介すると、下記の流れになります。

  1. S3から検索対象の文章の意味ベクトルをダウンロード・解凍
  2. 検索クエリとの意味ベクトルのコサイン類似度を計算
  3. ランキング形式に整形

また、この部分に関しては実際に用いたものではなく、単純に関連度順の記事が10件得られるように改変してます。

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]  # last_hidden_state
    input_mask_expanded = attention_mask.unsqueeze(
        -1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask

def embed_tformer(model, tokenizer, sentences):
    encoded_input = tokenizer(
        sentences, padding="max_length", truncation=True, max_length=256, return_tensors='pt')
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    encoded_input.to(device)

    with torch.no_grad():
        model_output = model(**encoded_input)

    sentence_embeddings = mean_pooling(
        model_output, encoded_input['attention_mask'])
    return sentence_embeddings

def get_result(df, query_embedding, sentence_embeddings_path):
    cols = [
        df.note_id,
        df.text,
    ]
    result = search_documents(query_embedding, cols,
                              sentence_embeddings_path, batch_size=4)
    return result

def search_documents(query_embedding, cols, sentence_embeddings_path, batch_size=4):
        note_id, sentences = cols
    
    # メモリに負荷がかかるので、バッチごとに意味ベクトルを読み込み、
    # 類似度計算した結果だけを配列に持っておきます
    s_scores = []
    for idx, batch_path in tqdm(enumerate(sentence_embeddings_path)):
        sentence_embeddings = torch.load(batch_path).detach().numpy()

        #  検索クエリとそれぞれの文書に対するcos類似度を一度に計算
        cos_sim = 1 - distance.cdist([query_embedding],
                                     sentence_embeddings, metric="cosine")[0]
                # ここの実装はあまり良くありません(参考程度に)
        s_scores.extend(zip(sentences[idx*batch_size:(idx+1)*batch_size],
                        cos_sim, range(idx*batch_size, (idx+1)*batch_size)))

    sorted_score = sorted(s_scores, key=lambda x: x[1], reverse=True)
    n_docs = note_id.shape[0]
    upper_10 = n_docs//10

    # 例としてスコアの高いもの10件を返すことにします
    ret_val = []
    for s, score, i in sorted_score[:upper_10]:
        ret_dict = {
            'score': score,
            'note_id': note_id[i],
        }
        ret_val.append(ret_dict)
    return ret_val
 
def read_csv(s3_client, bucket, path):
    s3_object = s3_client.get_object(Bucket=bucket, Key=path)
    csv_data = io.BytesIO(s3_object['Body'].read())
    csv_data.seek(0)
    df = pd.read_csv(csv_data, encoding='utf8')
    return
 
def predict_fn(input_object, model):
    logger.info('START predict_fn')

    start_time = time.time()
    sentence_embeddings = embed_tformer(model['model'], model['tokenizer'], input_object)

    # 文章に紐づいた情報が入ったテーブルの取得
    s3_client = boto3.client('s3')
    data_bucket = 'bucket-name'
    supplementary_table_path = 'sagemaker/data/supplementary-table.csv'
    supplementary_table = read_csv(s3_client, data_bucket, supplementary_table_path)
    
    # S3から圧縮された検索対象の意味ベクトルが入ったフォルダを取得(後述しますが、これはアンチパターンだと思います)
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(data_bucket)
    semantic_tensor_gz_path = 'sagemaker/data/semantic_tensor.tar.gz'
    bucket.download_file(semantic_tensor_gz_path, '/tmp/semantic_tensor.tar.gz')
    unpack_archive(filename='/tmp/semantic_tensor.tar.gz',
                   extract_dir='/tmp/semantic_tensor', format='gztar')

    # tmp以下に配置した検索対象文章の意味ベクトルファイルのパス
    sentence_embeddings_path = sorted(map(
          lambda x: os.path.join('/tmp/semantic_tensor/tensor', x),
          os.listdir('/tmp/semantic_tensor/tensor')))


    responce = get_result(supplementary_table, sentence_embeddings[0].tolist(), sentence_embeddings_path)

    print("--- Inference time: %s seconds ---" % (time.time() - start_time))
    return response

output_fn

def output_fn(prediction, accept='application/json'):
    logger.info('START output_fn')
    logger.info(f"accept: {accept}")
    if accept == 'application/json':
        output = json.dumps(prediction, ignore_nan=True)
        return output
    raise Exception(
        'Requested unsupported ContentType in Accept: {}'.format(accept))

学び

SageMaker(ML)に何でも押し付けない

これは、そもそも検索エンジンの仕組みを全くわかっていなかったというお恥ずかしい話でもあるのですが、今回のこの類似度検索の手法はbi-encoderといい、文章の意味ベクトルを推論結果として返し、その類似度の比較は別で行なうことによって、高速に検索ができるようになっています。

しかしながら、実装の時間も限られており、Amazon ElasticSearch KNN indexなどの別のアーキテクチャを導入し、類似度計算を他で行なう実装ができなかったため、推論内で類似度検索を無理くり実装するという愚行を犯してしまいました。

また、検索対象の文章のデータの持ち方も問題がありました。最初は、1文1文の意味ベクトルを1つずつファイルにして、S3にあげようとしていましたが、それだと取得に時間もかかるしお金もかかるということで、すべての検索対象をtar.gzでまとめ、推論インスタンスの中で解凍することで動かしています。

料金

推論に関しては、サーバーレスなエンドポイントがあって、試すにはちょうどいいなと感じました。

ただ、やはり学習にはそれなりにかかるので、オンプレGPU環境があれば、それを使ってモデルだけ引っ張ってこれるようにしてみるのも実験段階ではアリかもしれません。

最後に

今回のタスクは、私が合宿という機会にチームに提案して受け入れられたものですが、スマートキャンプではテックカンパニー化が現在推進されており、通常業務と並行して新しい技術の開発やビジネスモデルの創出に力を入れています。

気になった方は、ぜひ一度カジュアル面談にお越しください!

smartcamp.co.jp