Azure Event Hubs から Stream Analytics で BLOB に出力する話

Azure Event Hubs,Azure Stream Analytics,Microsoft Azure,Pepper,Python,プログラミング

こんにちは、ソリューション開発部の柴崎です。

前回は、Event Hubs にメッセージの送信を試みました。今回はメッセージの受信について調査しましたのでご紹介します。

Event Hubs だけではデータを保持し続けることができません。受信し必要に応じて加工して他に保持する必要があります。今回は Pepper から送信したログを Storage BLOB に出力するだけのシンプルなシナリオですので、Stream Analytics を使い Event Hubs から Storage BLOB に出力してみます。Event Hubs には JSON 形式で送信し、Storage BLOB には CSV 形式で保存することとします。

Stream Analytics ジョブ作成

作成といっても非常に簡単で、Azure Portal で設定するだけで済みます。

入力

画面の通りですので、設定で特に難しいことは無いと思います。Event Hubs に送信したフォーマットと一致している必要があることに注意してください。以下のようなエラーメッセージが出た場合は、Event Hubs に送ったメッセージを Stream Analytics の入力として利用できません。

入力イベントを Json としてシリアル化解除できません。考えられる原因: 1) イベントの形式が間違っている 2) 入力ソースが正しくないシリアル化形式で構成されている

今回の例では JSON を選びますが、例えば CSV の場合、Event Hubs に送信するメッセージに CSV のヘッダ行が必要です。ヘッダ行が含まれない場合は上のエラーとなり連携されません。

変換

今回は特に変換をかけずにそのまま BLOB に出力させます。

SELECT [種別]
     , [日時]
     , [メッセージ]
     , [EventProcessedUtcTime]
     , [PartitionId]
     , [EventEnqueuedUtcTime]
  INTO [OUTPUT_ALIAS]
  FROM [INPUT_ALIAS]

「*」を指定した場合、受信したメッセージのカラムが一致しないと、別の BLOB パスに出力されるようです。カラムの有無に差がある場合で、同じ BLOB に出力したい場合は、明示的にカラムを指定する必要があります。存在しないカラムを指定してもエラーにはなりません。

出力

ログ出力の場合は、ログローテートできると運用がしやすくなります。パス パターンに「{date}」および「{time}」を含めることで、年月日時の単位で BLOB を分けることができます。

また、CSV で出力する際にエスケープの問題が気になりますが Stream Analytics では自動的にエスケープされます。エスケープが必要な場合のみ「”」(二重引用符) で囲まれ、「”」のエスケープ表現が「””」となる RFC4180 形式です。

メッセージを送りつける Python コード

Stream Analytics の設定が完了したので、Event Hubs にメッセージを送信し、Storage BLOB に出力されることを確認しましょう。

SAS Token の作成からメッセージの送信まで通しで行う処理が以下の Python コードです。わざわざ PowerShell で SAS Token を作成していた前回の手順を省きました。今回も パブリッシャー ポリシー が無い Basic プランでも動作するコードになります。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import urllib2
import base64
import hmac,hashlib
import time
import datetime

data = '''{{
  "種別": "{}",
  "日時": "{:%Y/%m/%d %H:%M:%S}",
  "メッセージ": "{}"
}}'''.format(
  'Foo',
  datetime.datetime.now(),
  'ほ,げ\\"ふ\'が'
)

servicebusNamespace = 'SERVICEBUS_NAMESPACE'
eventHubPath = 'EVENT_HUB_PATH'
policyName = 'POLICY_NAME'
sharedAccessKey = 'SHARED_ACCESS_KEY'

resource = 'https://{}.servicebus.windows.net/{}/messages'.format(servicebusNamespace, eventHubPath)
sr = urllib2.quote(resource, safe = '')
expiry = int(time.time()) + 60

stringToSign = '{}\n{}'.format(sr, expiry)
signature = base64.encodestring(hmac.new(sharedAccessKey.encode('UTF-8'), stringToSign, hashlib.sha256).digest())
signature = signature.rstrip("\n")
signature = urllib2.quote(signature)

token = 'SharedAccessSignature sr={}&sig={}&se={}&skn={}'.format(sr, signature, expiry, policyName)

req = urllib2.Request(resource, data)
req.add_header('Authorization', token)

res = urllib2.urlopen(req)
print res.code, res.msg

このコードを実行すると、以下の内容が見事に BLOB に出力されました。

種別,日時,メッセージ,eventprocessedutctime,partitionid,eventenqueuedutctime
Foo,2016/07/04 21:59:51,"ほ,げ""ふ'が",2016-07-04T12:59:52.1983941Z,0,2016-07-04T12:59:51.9880000Z

まとめ

なんと、一切コードを書かずに Event Hubs から Storage BLOB に保存することができました。今回はただ保存するだけでしたが、Event Hubs と Stream Analytics の組み合わせにより Power BI と連携させるといった使い方も可能です。今後、他の候補の調査内容も追ってご紹介できればと思います。

  • Zabbix Enterprise Appliance
  • 低コスト・短納期で提供するまるごとおまかせZabbix