■pubsub
Publisher app → |GCPの壁| Topic(Schema) → Subscription 1や2 |GCPの壁| → Subscriber app
サブスクライバーappにPull/PushさせるPull/Pushのサブスクリプションをトピックに紐づける設定をしておく
【図解付き】Cloud Pub/Subに概要や使い方についてわかりやすく解説 - KIYONO Engineer Blog (kiyono-co.jp)
サブスクライバーappにPull/PushさせるPull/Pushのサブスクリプションをトピックに紐づける設定をしておく
【図解付き】Cloud Pub/Subに概要や使い方についてわかりやすく解説 - KIYONO Engineer Blog (kiyono-co.jp)
アプリで簡単にPubsubにパブリッシュや、サブスクもできるので、アプリ間の連携にPubsubが使える
• 非同期処理(画像処理とか重めのもの
• IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使われるっぽい
※ack idはpull時のみでPushのときはhttpステータスコードが200でackとなる
トピック(メッセージのパブリッシュ先)
• スキーマ/外部アクセス許可/リテンション/GCS/バックアップの設定がある (Push/Pullの設定はない)
• スキーマ/外部アクセス許可/リテンション/GCS/バックアップの設定がある (Push/Pullの設定はない)
• パブリッシュ側のベストプラクティス (JWT)
サブスクライバのPushとPull (PushはEndpointが必要、デフォルトはpull)
• at-least-once (少なくとも1回) 配信を提供します
• 同じ順序指定キーを持ち、同じリージョンに存在している場合は、メッセージの順序指定を有効にできます
• サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ
• メッセージ数が多いとpull向き サブスクリプション タイプを選択する | Pub/Sub ドキュメント | Google Cloud
pushはhttpsが必要?
• push エンドポイントのサーバーには、認証局が署名した有効な SSL証明書が必要でhttps
• Cloud run でEvent Arcを設定するとサブスクが自動作成されrunのデフォルトhttpsのURLが使われるが、これはPullよりPushで安定した
• CronバッチならPullで安定するのでは?大量リクエストはPull向きとある(Pullは失敗処理込みの話かも知れん)
トピックのリテンション:デフォルトなし、最小値:10分、最大値:31日
サブスクのリテンション:デフォルト値:7日、最小值:10分、最大値:7日
pubsub ack期限(Ack Deadline)
•デフォルト60秒> 設定10分>ack延長で最大1時間まで伸ばせると思われる
•デフォルト60秒> 設定10分>ack延長で最大1時間まで伸ばせると思われる
•exactly onceを設定しなければ期限の延長は保証されない
•ack期限を過ぎる、あるいはNackを返す場合、メッセージは再配送される
•ack応答期限の延長は99パーセンタイル(上位1%の値よりも小さい値のうち最大の値)で
modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延長 する最大値) 60minまで?
modifyAckDeadlineリクエストを定期的に発行すればよいらしい
modifyAckDeadlineリクエストを定期的に発行すればよいらしい
メッセージの再試行を強制するには
•nack リクエストを送信
•nack リクエストを送信
•高レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定して modifyAckDeadline リクエストを送信する
exactly once
1 回限りの配信 | Pub/Sub ドキュメント | Google Cloud
1 回限りの配信 | Pub/Sub ドキュメント | Google Cloud
•pullなら設定できる。他には、Cloud Dataflowを組み合わせる(プログラムコードでDataflowを使う感じかり、あるいはmessageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を排除する
•再配信は、メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しな かった場合のいずれかか原因で発生することがある。
※exactly onceはエラーでも再配信でPubsubパニックしないようにしたいために使うものではない?
pubsubはトピックにPublishされたメッセージをDataflowに引き継げる
•Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある
•メッセージ重複の削除ができる
•pubsub>dataflow>BQやGCS: この流れでログ等をストーリミングで入れ込める
BQサブスクリプション (PubSubはBigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある)
サブスクライバーApp側のコードでのフロー制御によりちょっと待てよのトラフィック急増対応
デッドレタートピック (配信試行回数が見れる)やエラーでの再配信
• Pub/Subサブスクリプションにデッドレタートピックを設定しておくと、一定の回数再送信が失敗したメッセージの宛先がデッドレタートピックに変更され貯められる
メッセージのフィルタ、同時実行制御により多いメッセージに対応
Pubsubをローカルでエミュレートする
pubsubのスナップショットやリテンション
トピックにリテンションを設定しスナップショット作成> 過去のサブスクしたメッセは見えなさそう
サブスクにリテンションを設定しスナップショット作成> 過去のAckしたメッセは見えなさそう
スナップショットでどう使うのか?
キューがたまっているときに撮るものと思われる。またシーク時間のポイントを設定する意味がある
スナップショットとシークを使いこなして特定期間の再実行を行う機能
スナップショットで再実行する
シークは指定時間か最後のスナップショット以降のサブスク再実行(実際pushでrunが再実行された)
Pubsubにどんなメッセージが入ってきているか確認する方法
pull形式ならAckしなければpullボタンで拾い見れる (トピックでパブリッシュしてサブスクでPull し見る)
トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?
デッドレターキュー(ドロップしたものの確認と救済?)
サブスクでDLQのONしデッドレタートピックを設定し転送する>GCSにもバックアップできる
DLTでメッセージ(実行済みOR未実行)の再生
データ形式:スキーマを使うか、スキーマなしならdataで取得できる
from google cloud import pubsub_v1
from avro.io import DatumReader, BinaryDecoder
from avro schema import Parse
project_id="your-project-id"
subscription id="your-subscription-id"
subscriber pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
avro_schema = Parse("""
{
"type": "record",
"name": "Avro".
"fields": [
{
"name": "ProductName",
"type": "string",
"default":""
},
{
"name": "SKU",
"type": "int",
"default": 0
}
}
def callback(message):
print(f"Received message: {message}")
reader = DatumReader(avro_schema)
decoder = Binary Decoder (message.data)
avro_record = reader.read(decoder)
message_id=message.message id
message.ack()
print("Message ID: (message_id}")
product_name = avro_record['ProductName']
sku= avro_record['SKU']
print("Product Name: (product_name}")
print("SKU: (sku}")
subscriber.subscribe(subscription_path, callback=callback)
def callback(message):
print("Received message: (message)")
data message data
message_id=message.message_id
message.ack()
print("Date (data)")
print("Message ID: (message_id)")
Pub/SubでStreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech Blog
StreamingPull API を使用するとアプリとの間で永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pullされる。1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに繋ぎなおすためmodifyAckDeadlineも切れ再配信されるバグがある