Oisix ra daichi Creator's Blog(オイシックス・ラ・大地クリエイターズブログ)

オイシックス・ラ・大地株式会社のエンジニア・デザイナーが執筆している公式ブログです。

digdag + embulk でSaaSのレポートを検査する仕組みを作った話


こんにちは!SREセクションの林(@morihaya55)です。


本記事はOisix ra daichi Inc. Advent Calendar 2018の17日目の記事です。 昨日は@yymzkの社内でキーボードつくる会を企画した話でした。 同僚がキーボード沼に沈んでいくのを横から暖かく見守るのは気持ちが良いですね、どんどん沈んで欲しいです。 ちなみに私は無難に既製品のErgoDox EZを使っています。分割キーボードは肩が広がる感じが良いですね。



さて今回は digdag + embulk でSaaSのレポートを検査する仕組みを作った話 と題してつらつらと書きます。

やったこと

digdag + embulkでSaaSのレポートを検査しました。具体的には以下のような仕組みを実装しました。

f:id:oitech:20181217234110p:plain
構成概要図

Slackに以下の様な結果が通知されます。(ぼかしなのはご容赦)

f:id:oitech:20181218002455p:plain:w500
slack通知画像

目的

実装した目的は大きく以下の二つです。

  • 正常時のSaaSの処理結果について、送信件数の確認を"楽に"行いたい
  • 異常時の検知をしたい

正常時の件数確認を楽に行いたい

経緯を簡単に説明します。 詳しくはマーケターの最新動向が分かる! markezineさんに記事があるのでそちらを参照して頂くとして、 オイラ大地の定期宅配サービスはざっくり以下の流れで構成されています。

  1. 週次のペースでお客様にお届けする商品をご提案
  2. お客様が提案内容を確認・修正して確定
  3. 実際に商品をお届けする

その中で2番目の「お客様が提案内容を確認・修正して確定」という点について、お客様の確認忘れを少なくしたいという思いから定期的にお知らせを行なっています。 お知らせの方法は「メール、LINE、アプリ、SMS」の4つで、これらのお知らせをSaaSを利用して行なっています。

これまで「どのお知らせ方法で、何名のお客様にお知らせを行なったか」という点について、担当者は定期的にSaaSの管理画面へログインし、処理結果を確認する必要がありました。それは少なからず煩雑であり、担当者の増減でSaaSのアカウント管理も必要となることからより簡単に確認を行う方法を検討していました。

異常時の検知をしたい

今回の実装を行う大きなモチベーションになった理由のもう一つが異常検知です。運用期間を考えれば十分に発生頻度は少ないと言えるのですが、以下の問題が短期間で連続して発生したため新たな異常検知の仕組みを実装する事にしました。

  1. お知らせ対象リストを作成する弊社側システムの遅延で、SaaS側の自動実行のタイミングに間に合わず、空振りしてしまうケースがある
  2. SaaS側のごく稀な不具合でお知らせが行われていないケースがある

対策を検討した

対策の方法はいくつか考えましたが、既存システムに改修が不要で、サクッと実装できそうな仕組みを採用する事にしました。 SaaS側で処理結果をレポートファイルとして出力する機能と、その出力先にSFTPでアクセス可能なエンドポイントの提供がありましたので、そちらを利用することとしました。

ボツ案としては以下の様なものがあります。

  • ダミー用アカウントを作成して受信を確認する -> 既存システムに改修が必要かつ受信確認を誰がやるのか問題
  • Seleniumなどでブラウザを自動化してSaaSのレポート画面を定期チェックさせる -> Seleniumの開発大変そうだぞ問題

どうやったのか

SaaSからレポートファイルの取得をdigdag + embulkで行う処理を実装しました。

その後取得したレポートファイルをdigdag + Python(pandas)で検査し、Slackへ通知を行うようにしました。

digdag、embulkとは

初見の方向けに簡単な紹介をします。
digdagはTreasureData社が開発を行っているOSSで、ワークフローエンジンとして動作します。具体的にはさまざまなジョブを任意の順番、並列性で任意の時間に実行させる、といったことを実現します。

embulkも同じくTreasureData社製のOSSで、バルクデータローダとして動作します。具体的にはPostgreSQL、MySQLなどのRDB、S3、CSVファイル、さまざまな形式のデータストアやデータファイルからデータを一括で読み込み、別のデータストア、ファイルへ書き出すといったことを実現します。

一般的なdigdagの利用方法として、データ分析基盤へのデータマイグレーションをdigdagのワークフロー機能を使って行い、その中でembulkを用いるケースが多いと感じています。

そしてオイラ大地もその用途でdigdag + embulkを使用しており、スケーラビリティやユーザビリティに信頼を寄せています。今回もスケーラブルな既存のdigdag server環境があったことで、特に迷わずにdigdag + embulkを選択したと言う経緯があります。

ワークフローを実現するだけならJenkinsやRundeckといったその他のツールも多く存在しますが、私がdigdagを使うメリットとして感じている点は次の通りです。

  • digファイルという、YAML形式に似たシンプルな構文でジョブを定義できる
  • オペレータと呼ばれる、TreasureData、S3、BigQuery、PostgreSQLなどに接続する機能をデフォルトで持つ
  • 導入がシンプル
  • ジョブの実行と確認が可能なシンプルなGUIコンソールを持つ

(実はこの紹介部分はオイラ大地の有志で出版した技術書典5の本から引用していたりします。)

Python、pandasとは

どちらも大変著名ですしサラッとコメントします。

Pythonは有名で人気なプログラミング言語です。

pandasはPythonのツール群の一つで、データ分析などに適した機能を備えています。 当アドベンドカレンダーの12/13の記事でも平島さんの...データを検索する。この際にpandasを使ってみようと思います。...といったコメントがある様に、社内でも活用していきたいという熱が上がっています。

かく言う私も普段はインフラがメインなのでこの機を幸いと「CSVの処理?これはもうpandasさんを使っていき!」と言う軽いノリで利用させて頂きました。

embulkのプラグインの豊富さが凄い

実装するにあたり、SaaS側のレポートファイルの仕様を確認したところ、少々手強い物であることがわかりました。 お知らせの4つの方法のレポートファイルの形式が共通のフォーマットではなかったのです。

それぞれで出力方法を選択することはできるのですが、一律で共通のCSVで出力することはできませんでした。 結果として以下の形式でそれぞれのレポートファイルを作成し、後はembulkで処理を行う事にしました。

  • メール通知レポート: CSV
  • LINE通知レポート: CSV
  • SMS通知レポート: EXCEL
  • アプリ通知レポート: ZIP圧縮CSV

改めて図に起こすと以下の通りです。

f:id:oitech:20181218012330p:plain
概要図少し詳細

ここでembulkの役割としては、SaaSからSFTPでデータを取り込み、4つのそれぞれの形式を読み込み、Python+pandasのために"共通の書式"でCSV形式のデータを出力する必要があります。 そしてそれを簡単に実現するのがembulkのプラグインです。List of Plugins by Categoryにある通り、embulkには豊富なプラグインがあります。 今回は以下のプラグインを利用しました。

コードのご紹介

それでは実際のコードを紹介します。(注意:全てのコードはWeb用に多少改変がある場合があります)

親digファイル

まずはワークフローを定義するdigファイルです。digファイルは親digファイルと子digファイルに別れており、 親ファイルでは実行スケジュールの設定と、環境変数の設定を行なっています。 一番数が多いのはこの親digファイルで、スケジュール別、お知らせ別でdigファイルを分けています。

timezone: Asia/Tokyo

schedule:
  cron>: 35 08 * * MON,TUE,WED,THU

_export:
  workflow_name: "all_line_0830"
  path_prefix: "${env.root_dir}/digdag-check-smc/csv/"
  SFTP_SAAS_PATH_PREFIX: "/reports/"
  SFTP_SAAS_PATH_MATCH_PATTERN: all_line_0830
  activity_name: "LINE_当日_0830"

!include : include/slack.dig
!include : include/sftp_saas.dig

+call-sub1:
  call>: sub1-check-smc-sftp-line.dig

  _error:
    slack>: include/danger-template.yml

+slack:
  slack>: include/good-template.yml

子digファイル

続いて子digファイルです。タスク +get-file-from-saas でSaaSからSFTPでファイル取得を行うembulkを実行しています。 sh>: SFTP_SAAS_PASSWORD=${secret:sftp_salesforce.password} の箇所でdigdagのシークレットとして設定したパスワードの取り出しを行なっています。

その後タスク +check-karaburi でPython + pandasのCSVファイルの検査スクリプトを実行しています。今考えるとdigdagには py>: オペレータがありますが、この時は深く考えずに sh>: オペレータでPythonスクリプトを実行しています。 子digファイルは4つのお知らせ方法ごとに作成しています。それぞれのレポートファイルの書式の違いについてはembulkが対応するため、digファイル自体の違いは結果的に呼び出すyml.liquidファイルの名前の違いしかありません。

timezone: Asia/Tokyo

_export:
  workflow_name: "${env.workflow_name}-sub1-check-saas-sftp-line"

+get-file-from-saas:
  sh>: SFTP_SAAS_PASSWORD=${secret:sftp_salesforce.password} embulk ${env.embulk_options} run yml/sftp/check-saas-sftp-line.yml.liquid

+check-karaburi:
  sh>: python bin/check-karaburi.py ${path_prefix}${SFTP_SAAS_PATH_MATCH_PATTERN}000.00.csv "line" "${activity_name}"

embulkのyml.liquid: メール通知レポート: CSV

子digファイルから呼び出されるembulkのyaml.liquidファイルです。変数はdigdagから受け渡されます。 メール通知のレポートはシンプルなCSVのため、特に変わった処理はしていません。

in:
  type: sftp
  host: {{ env.SFTP_SAAS_HOST }}
  user: {{ env.SFTP_SAAS_USER }}
  password: {{ env.SFTP_SAAS_PASSWORD }}
  user_directory_is_root: {{ env.SFTP_SAAS_USER_DIRECTORY_IS_ROOT }}
  timeout: {{ env.SFTP_SAAS_TIMEOUT }}
  path_prefix: {{ env.SFTP_SAAS_PATH_PREFIX }}
  path_match_pattern: {{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: ...
out:
  type: file
  path_prefix: {{ env.path_prefix }}/{{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  file_ext: csv
  formatter:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: ...

embulkのyml.liquid: LINE通知レポート: CSV

LINE通知のレポートもメールと同様にシンプルなCSVのため、特に変わった処理はしていませんのでコードは省略します。 ポイントとしては、本記事では省略している columns: の部分です。 4つのレポートはそれぞれ書式が異なっているため、データの列名も同じではありません。 それは後続のPython + pandasで処理を行う際に望ましく無いため、embulkのoutのcolumnsで同じ列名になるように置き換えを行なっています。

embulkのyml.liquid: SMS通知レポート: EXCEL

SMS通知のレポートはEXCELであるため当初諦めかけたのですが、embulkのプラグインとしてあっさり用意されていました。 (なおembulkのEXCELパーサプラグインはroo-excelとpoi_execlの2つがあり迷ったのですが、DL数と機能の多さからpoi_excelを選択しています) またシート名が日本語の2バイト文字だったため動作するか危ぶんだのですが、全く問題なく動きました。

in:
  type: sftp
  host: {{ env.SFTP_SAAS_HOST }}
  user: {{ env.SFTP_SAAS_USER }}
  password: {{ env.SFTP_SAAS_PASSWORD }}
  user_directory_is_root: {{ env.SFTP_SAAS_USER_DIRECTORY_IS_ROOT }}
  timeout: {{ env.SFTP_SAAS_TIMEOUT }}
  path_prefix: {{ env.SFTP_SAAS_PATH_PREFIX }}
  path_match_pattern: {{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  parser:
    type: poi_excel
    sheets: ["モバイル SMS メッセージのサマリーレポート"]
    skip_header_lines: 22
    columns:
    - {name: ...
out:
  type: file
  path_prefix: {{ env.path_prefix }}/{{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  file_ext: csv
  formatter:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: ...

embulkのyml.liquid: アプリ通知レポート: ZIP圧縮CSV

アプリ通知に関してはCSVだが圧縮されていると言うことで、decodersの設定を追加しています。 それ以外は平CSVと同じですので特に迷うことはありませんでした。

in:
  type: sftp
  host: {{ env.SFTP_SAAS_HOST }}
  user: {{ env.SFTP_SAAS_USER }}
  password: {{ env.SFTP_SAAS_PASSWORD }}
  user_directory_is_root: {{ env.SFTP_SAAS_USER_DIRECTORY_IS_ROOT }}
  timeout: {{ env.SFTP_SAAS_TIMEOUT }}
  path_prefix: {{ env.SFTP_SAAS_PATH_PREFIX }}
  path_match_pattern: {{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  decoders:
    - type: commons-compress
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: ...
out:
  type: file
  path_prefix: {{ env.path_prefix }}/{{ env.SFTP_SAAS_PATH_MATCH_PATTERN }}
  file_ext: csv
  formatter:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: ...

ここまでで、4つの通知方法のレポートファイルについて、Python + pandasで処理を行うためのCSVデータ化が完了します。

pandasの有能さが凄い

ここからの実装は行なっていて本当に楽しかった記憶があります。Pythonも普段から多く書くわけでは無いのですがpandasは本当に初めての利用でした。 意気込んで利用したpandasは、"射影"と"0より上か?"の簡単なクエリを行う程度でしたが、ドキュメントを読みながら多機能さにワクワクしました。 今見てみると勢いで作ったためコメントも日英入り乱れていたりしますが、それは今後の課題と考えています。(まずは動くものを作るのが大事><) (pep8も存在は知っているので徐々に学びます)


Pythonのコード自体は特に複雑なことをやっていません。 引数のチェックをした上で、CSVをpandasのDataFrameに読み込み、df.queryで0件では無いかの件数チェックを行います。 問題がなければ必要な列名を射影しつつstrで文字列変換し、Slackへ通知しています。

#!/usr/bin/env python

import os
import sys
import pandas as pd
import slackweb
import re

# 初期化。引数で”Activity_Name"が渡された時の判定用フラグの
filter_actiity_flg = False

# Set Slack variables
slackurl="xxxx"
slackchannel = "#watch-saas_data"
slackuser = "check-bot"
slackicon = ":digdag:"

# Define Push slack
def sendslack(channel):
  slack = slackweb.Slack(url=slackurl)
  slack.notify(text=slacktext, channel=channel, username=slackuser, icon_emoji=slackicon)

# Define Print Usage
def printusage():
  print("#ERRO: Usage: python " + args[0] + " <CSV File> <app|line|mail|sms> [Filter string on Activity_Name]")

# 引数を取得
args = sys.argv

# 引数の数によって処理を変更
if len(args)==4:
  # 引数が3個の場合、Activity_Nameをフィルタする(引数+1になる)
  filter_actiity_flg = True
  activity_name = args[3]
  print("#INFO: Activity_Name is set. Print only row include '" + activity_name + "' on Activity_Name.")
elif len(args)==3:
  # 引数が2個の場合、特にフィルタ処理は行わない
  print("#INFO: Activity_Name is not set. Print all rows'")
else:
  # それが無い場合、エラーとする
  printusage()
  sys.exit(1)

# CSVファイル名を取得
file = args[1]

# ファイル存在チェック
if os.path.exists(file):
  print("#INFO: " + file + " is exists.")
else:
  print("#ERRO: " + file + " is not exists.")
  sys.exit(1)

# Read CSV file by pandas
df = pd.read_csv(file)

# Check Activity Name for LINE Report
if filter_actiity_flg:
  df = df[df['Activity_Name'].str.contains(activity_name)]

# Check Datafram is empty
if df.empty:
  print("#ERRO: " + file + " has no lines. It is empty...")
  sys.exit(1)

# Check Send_Type
send_type = args[2]
if re.fullmatch("line|sms|app|mail",send_type):
  print("#INFO: Sent_Type is " + send_type)
else:
  printusage()
  sys.exit(1)

# Add Send_Type
df['Send_Type'] = ":" + send_type + ":"

# Check 0 row counts on "Total_Sent" colmun
df2 = df.query('Total_Sent <= 0')

# とりあえず件数をSlackへ報告
slacktext = str(df[['Send_Type','Journey_Name','Activity_Name','Total_Sent']].to_string(index=False))
sendslack(slackchannel)

# 0件チェック
if len(df2.index) == 0:
  print("#INFO: No 0 count. OK.")
else:
  print("#WARN: 0 count exsist.")
  slacktext = ":bangbang: <!here>通知処理で0件のレポートが発生しました。以下の処理を確認してください。:bangbang: \n" + str(df2[['Send_Type','Journey_Name','Activity_Name','Total_Sent']].to_string(index=False))
  sendslack("#sec-unyou")
  sendslack("#watch-saas_critical")

まとめ

長々と書いてきましたがまとめると以下の感想です!技術楽しんでいきましょう!!!

  • digdag + embulkのコンビは本当にユースフルですね。プラグイン万歳!
  • Python + pandas、分析用途と聞いて恐れずに、CSVを便利に扱うツールとして利用する程度でも威力を発揮するのでみんなチャレンジ!


明日は順当に行けば@kariad_uuの「WACATE参加レポート」です。何が飛び出すか私も楽しみです。

Oisix ra daichi Creator's Blogはオイシックス・ラ・大地株式会社のエンジニア・デザイナーが執筆している公式ブログです。

オイシックス・ラ・大地株式会社では一緒に働く仲間を募集しています