

Azure Event Hubs から Stream Analytics で BLOB に出力する話
こんにちは、ソリューション開発部の柴崎です。
前回は、Event Hubs にメッセージの送信を試みました。今回はメッセージの受信について調査しましたのでご紹介します。
Event Hubs だけではデータを保持し続けることができません。受信し必要に応じて加工して他に保持する必要があります。今回は Pepper から送信したログを Storage BLOB に出力するだけのシンプルなシナリオですので、Stream Analytics を使い Event Hubs から Storage BLOB に出力してみます。Event Hubs には JSON 形式で送信し、Storage BLOB には CSV 形式で保存することとします。
Stream Analytics ジョブ作成
作成といっても非常に簡単で、Azure Portal で設定するだけで済みます。
入力
画面の通りですので、設定で特に難しいことは無いと思います。Event Hubs に送信したフォーマットと一致している必要があることに注意してください。以下のようなエラーメッセージが出た場合は、Event Hubs に送ったメッセージを Stream Analytics の入力として利用できません。
今回の例では JSON を選びますが、例えば CSV の場合、Event Hubs に送信するメッセージに CSV のヘッダ行が必要です。ヘッダ行が含まれない場合は上のエラーとなり連携されません。
変換
今回は特に変換をかけずにそのまま BLOB に出力させます。
SELECT [種別]
, [日時]
, [メッセージ]
, [EventProcessedUtcTime]
, [PartitionId]
, [EventEnqueuedUtcTime]
INTO [OUTPUT_ALIAS]
FROM [INPUT_ALIAS]
「*」を指定した場合、受信したメッセージのカラムが一致しないと、別の BLOB パスに出力されるようです。カラムの有無に差がある場合で、同じ BLOB に出力したい場合は、明示的にカラムを指定する必要があります。存在しないカラムを指定してもエラーにはなりません。
出力
ログ出力の場合は、ログローテートできると運用がしやすくなります。パス パターンに「{date}」および「{time}」を含めることで、年月日時の単位で BLOB を分けることができます。
また、CSV で出力する際にエスケープの問題が気になりますが Stream Analytics では自動的にエスケープされます。エスケープが必要な場合のみ「”」(二重引用符) で囲まれ、「”」のエスケープ表現が「””」となる RFC4180 形式です。
メッセージを送りつける Python コード
Stream Analytics の設定が完了したので、Event Hubs にメッセージを送信し、Storage BLOB に出力されることを確認しましょう。
SAS Token の作成からメッセージの送信まで通しで行う処理が以下の Python コードです。わざわざ PowerShell で SAS Token を作成していた前回の手順を省きました。今回も パブリッシャー ポリシー が無い Basic プランでも動作するコードになります。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import urllib2
import base64
import hmac,hashlib
import time
import datetime
data = '''{{
"種別": "{}",
"日時": "{:%Y/%m/%d %H:%M:%S}",
"メッセージ": "{}"
}}'''.format(
'Foo',
datetime.datetime.now(),
'ほ,げ\\"ふ\'が'
)
servicebusNamespace = 'SERVICEBUS_NAMESPACE'
eventHubPath = 'EVENT_HUB_PATH'
policyName = 'POLICY_NAME'
sharedAccessKey = 'SHARED_ACCESS_KEY'
resource = 'https://{}.servicebus.windows.net/{}/messages'.format(servicebusNamespace, eventHubPath)
sr = urllib2.quote(resource, safe = '')
expiry = int(time.time()) + 60
stringToSign = '{}\n{}'.format(sr, expiry)
signature = base64.encodestring(hmac.new(sharedAccessKey.encode('UTF-8'), stringToSign, hashlib.sha256).digest())
signature = signature.rstrip("\n")
signature = urllib2.quote(signature)
token = 'SharedAccessSignature sr={}&sig={}&se={}&skn={}'.format(sr, signature, expiry, policyName)
req = urllib2.Request(resource, data)
req.add_header('Authorization', token)
res = urllib2.urlopen(req)
print res.code, res.msg
このコードを実行すると、以下の内容が見事に BLOB に出力されました。
種別,日時,メッセージ,eventprocessedutctime,partitionid,eventenqueuedutctime
Foo,2016/07/04 21:59:51,"ほ,げ""ふ'が",2016-07-04T12:59:52.1983941Z,0,2016-07-04T12:59:51.9880000Z
まとめ
なんと、一切コードを書かずに Event Hubs から Storage BLOB に保存することができました。今回はただ保存するだけでしたが、Event Hubs と Stream Analytics の組み合わせにより Power BI と連携させるといった使い方も可能です。今後、他の候補の調査内容も追ってご紹介できればと思います。