/// BANGBOO BLOG ///

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31


May 9, 2024 List
Pubsub on May 09, 2024 12:00 AM

May 9, 2024

Pubsub
■pubsub
Publisher app → |GCPの藹??| Topic(Schema) → Subscription 1や2 |GCPの藹??| → Subscriber app
 サブスクライバ繝?app縺?Pull/PushさせるPull/Pushのサブスクリプションをトピックに軆??づける設藹??をしてお縺?

【図解付き】Cloud Pub/Subに觸??要や使い方について繧?かりやすく解説 - KIYONO Engineer Blog (kiyono-co.jp)
Pub/Sub サービスの觸??要  |  Pub/Sub ドキュメント  |  Google Cloud
GCP縺?Cloud PubSubで考慮するこ縺? - Carpe Diem (hatenablog.com)
Pub/Sub の割り当てと臀??限  |  Pub/Sub ドキュメント  |  Google Cloud
アプリで簡単縺?Pubsubにパブリッシュや、サブスクもできるので、アプリ間の連携縺?Pubsubが使える
 窶? 非同期処理(画蜒?処理とか重めのも縺?
 窶? IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使繧?れるっぽい
  窶?ack id縺?pull時のみ縺?Pushのとき縺?httpステータスコードが200縺?ackとなる
GCP - Pub/Sub サービス觸??要 #GoogleCloud - Qiita
Pub/Sub メッセージの臀??成とレスポン繧?  |  Python 逕? App Engine フレキシブル環藹??に関するドキュメント  |  Google Cloud
トピック・??メッセージのパブリッシュ先)
 窶? スキーマ/外部アクセス許藹??/リテンショ繝?/GCS/バックアップの設定がある (Push/Pullの設定はない)
 窶? パブリッシュ側のベストプラクティ繧? (JWT) 
  Pub/Sub トピックにパブリッシュするためのベスト プラクティ繧?  |  Pub/Sub ドキュメント  |  Google Cloud
サブスクライバ縺?Push縺?Pull (Push縺?Endpointが必要、デフォルト縺?pull)
 GCP - Pub/Sub サービス觸??要 #GoogleCloud - Qiita
 窶? at-least-once (少な縺?とも1回) 配信を觸??供します
 窶? 同じ順蠎?指定キーを持ち、同じリージョンに藹??在している場合は、メッセージの順蠎?指定を有効にできます
 窶? サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ
 窶? メッセージ数が多い縺?pull向き サブスクリプショ繝? タイプを選択する  |  Pub/Sub ドキュメント  |  Google Cloud
push縺?httpsが必要?
 push サブスクリプションを作成する  |  Pub/Sub ドキュメント  |  Google Cloud
 窶? push エンドポイントのサーバーには、認証藹??が署名した有効縺? SSL証譏?書が必要縺?https
 窶? Cloud run 縺?Event Arcを設藹??するとサブスクが自動作成されrunのデフォルトhttps縺?URLが使繧?れるが、これ縺?PullよりPushで藹??定した
 窶? CronバッチならPullで藹??定するので縺??大驥?リクエスト縺?Pull向きとある(Pullは失敗処理込みの話かも知れん)
トピックのリテンショ繝?:デフォルトなし、最蟆?蛟?:10分、最大蛟?:31譌?
サブスクのリテンショ繝?:デフォルト蛟?:7日、最蟆?蛟?:10分、最大蛟?:7譌?
 サブスクリプショ繝? プロパテ繧?  |  Pub/Sub ドキュメント  |  Google Cloud
pubsub ack期限(Ack Deadline)
 •デフォルト60秒> 設藹??10分>ack延長で最螟?1時間まで伸ばせると思繧?れる
 リース管理で確認時間を延長する  |  Pub/Sub ドキュメント  |  Google Cloud
 窶?exactly onceを設藹??しなければ期限の延長は臀??証されない
 窶?ack期限を驕?縺?る、あるい縺?Nackを返す場合、メッセージは再配送される
 窶?ack応答期限の延長縺?99パーセンタイ繝?(上位1%の値よりも蟆?さい値のうち最大の蛟?)縺?
 modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延髟? する最大蛟?) 60minま縺??
  modifyAckDeadlineリクエストを定期的に発鐔??すればよいらしい
メッセージの再試鐔??を強制するに縺?
 窶?nack リクエストを送菫?
 •饅??レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定し縺? modifyAckDeadline リクエストを送信する
exactly once
1 回限りの配菫?  |  Pub/Sub ドキュメント  |  Google Cloud
 窶?pullなら設藹??できる。他には、Cloud Dataflowを組み合繧?せる(プログラムコード縺?Dataflowを使う感じかり、あるい縺?messageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を觸??除する
 •再配信は、メッセージに対してクライアントによる否藹??確鐔??応答が行繧?れた場合、または確認応答期限が切れる前にクライアントが確鐔??応答期限を延長し縺? かった場合のいずれかか藹??因で発生することがある。
  窶?exactly onceはエラーでも再配信縺?Pubsubパニックしないようにしたいために使うものではない?
pubsubはトピック縺?PublishされたメッセージをDataflowに藹??き継げる
 Dataflow (Apache Beam) を大驥?のメッセージをバッチ処理する場合に使える
  Pub/Sub→Dataflow→処理
 窶?Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある
 •メッセージ重複の削除ができる
 窶?pubsub>dataflow>BQやGCS: この觸??れでログ軆??をストーリミングで入れ込める
BQサブスクリプショ繝? (PubSub縺?BigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある)
 Langganan BigQuery  |  Dokumentasi Pub/Sub  |  Google Cloud
 BigQuery サブスクリプションの臀??成  |  Pub/Sub ドキュメント  |  Google Cloud
サブスクライバ繝?App側のコードでのフロー制御によりちょっと藹??てよのトラフィック急藹??対藹??
 フロー制御を使用して臀??時的な急藹??を処理する  |  Pub/Sub ドキュメント  |  Google Cloud
デッドレタートピッ繧? (配信試行回数が見れる)やエラーでの再配菫?
 メッセー繧? エラーの処理  |  Pub/Sub ドキュメント  |  Google Cloud
  窶? Pub/Subサブスクリプションにデッドレタートピックを設藹??してお縺?と、一定の回数再送信が失敗したメッセージの藹??先がデッドレタートピックに藹??更され貯められる
メッセージのフィルタ、同時実行制御により多いメッセージに対応
 サブスクリプションからのメッセージをフィルタする  |  Pub/Sub ドキュメント  |  Google Cloud
Pubsubをローカルでエミュレートする
 エミュレータを使用したローカルでのアプリのテスト  |  Pub/Sub ドキュメント  |  Google Cloud
pubsubのスナップショットやリテンショ繝?
クイックスタート: スナップショットまたはタイムスタンプまでシークし縺? Pub/Sub でメッセージを再生する  |  Pub/Sub ドキュメント  |  Google Cloud
トピックにリテンションを設藹??しスナップショット作成> 驕?去のサブスクしたメッセは鐔??えなさそう
サブスクにリテンションを設藹??しスナップショット作成> 驕?去縺?Ackしたメッセは鐔??えなさそう
スナップショットでどう使うのか?
 cloud pubsubで配信觸??みのメッセージを再送する #PubSub - Qiita
 キューがたまっているときに撮るものと思繧?れる。またシーク時間のポイントを設藹??する諢?味がある
 スナップショットとシークを使いこなして特藹??期間の再実行を行う機閭?
  スナップショットで再実行する
  シークは指定時間か最後のスナップショット以降のサブスク再実行(実際push縺?runが再実行された)
Pubsubにどんなメッセージが入ってきているか確鐔??する方觸??
 pull形藹??ならAckしなけれ縺?pullボタンで拾い見れる (トピックでパブリッシュしてサブスク縺?Pull し見る)
 トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?
デッドレターキュ繝?(ドロップしたものの確認と救済?)
 サブスク縺?DLQ縺?ONしデッドレタートピックを設藹??し転送する>GCSにもバックアップできる
 DLTでメッセー繧?(実行済縺?OR未藹??行)の再生
データ形蠑?:スキーマを使うか、スキーマなしならdataで藹??得できる
 トピックのスキーマを作成する  |  Pub/Sub ドキュメント  |  Google Cloud
 Cloud Pub/Subの觸??要縺?Pythonでの藹??霍? - case-kの備忘骭?
from google cloud import pubsub_v1
from avro.io import DatumReader, BinaryDecoder
from avro schema import Parse
project_id="your-project-id"
subscription id="your-subscription-id"
subscriber pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
avro_schema = Parse("""
{
"type": "record",
"name": "Avro".
"fields": [
{
"name": "ProductName",
"type": "string",
"default":""
},
{
"name": "SKU",
"type": "int",
"default": 0
}
}
def callback(message):
print(f"Received message: {message}")
reader = DatumReader(avro_schema)
decoder = Binary Decoder (message.data)
avro_record = reader.read(decoder)
message_id=message.message id
message.ack()
print("Message ID: (message_id}")
product_name = avro_record['ProductName']
sku= avro_record['SKU']
print("Product Name: (product_name}")
print("SKU: (sku}")
subscriber.subscribe(subscription_path, callback=callback)

def callback(message):
print("Received message: (message)")
data message data
message_id=message.message_id
message.ack()
print("Date (data)")
print("Message ID: (message_id)")

Pub/Sub縺?StreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech Blog
StreamingPull API を使用するとアプリとの間で永続的な藹??方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐ縺? pullされる。1 つ縺? pull リクエスト縺? 1 つ縺? pull レスポンスが返る通常縺? 単項 Pull と觸??較すると、高スループット・臀??レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに軆??縺?なおすためmodifyAckDeadlineも切れ再配信されるバグがある


+++
メッセージ縺?TTL (Time-To-Live) はメッセージ臀??持期間(message retention duration) に臀??存
メッセージが TTLを超えると、自動的に削除され、Subscriberが藹??信できな縺?なる
ackDeadlineSeconds (デフォルト縺?10秒、最螟?600秒) を超えたACKのメッセージは再配信されますが、TTL期限を超えた場合は觸??える
#TTLを最螟?7日間に設定
gcloud pubsub subscriptions update my-subscription message-retention-duration=604800s

DLQ (Dead Letter Queue)
Subscriberが指定回謨?(最螟?100回) メッセージ縺?ACKを行繧?なかった場合に、メッセージを隔離する仕組縺?
DLQもサブスクなので期間やTTL設藹??方觸??は同じ

#DLQ topic 作成
gcloud pubsub topics create my-dlq-topic

#5回失敗したらDLQ縺?
gcloud pubsub subscriptions update my-subscription dead-letter-topic=projects/my-project/topics/my-diq-topic max-delivery-attempts=5

#DLQ subsc作成
gcloud pubsub subscriptions create my-diq-subscription--topic-my-diq-topic

#サブスクの詳細確認
gcloud pubsub subscriptions describe my-diq-subscription

#DLQメッセージの確認、-auto-ackも付けられるが、
gcloud pubsub subscriptions pull my-dlq-subscription -limit=10
[{
"ackld": "Y3g49NfY...=",
"message": {
"data": "SGVsbG8gd29ybGQ=", #Base64 エンコードされたデー繧?
"messageld": "1234567890",
"publish Time": "2024-02-18T12:34:56.789Z"
}
}]

#base64のでコードが必要
echo "SGVsbG8gd29ybGQ=" | base64-decode

#ack-idによりackを返しDLQメッセージを削髯?
gcloud pubsub subscriptions acknowledge my-diq-subscription--ack-ids=Y3g49NfFY

モニタリン繧? > アラートポリシーから新しいアラートを作成し
pubsub.subscription.outstanding_messages を監鐔??対象に選択し、閾値を設藹??するとよい

#DLQ メッセージの再処理をfunctionsに設定 (トピックに入れなおす)
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

def republish_message(message):
    future = publisher.publish(topic_path, message.data)
    print(f"Republished message ID: {future.result()}")

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-dlq-subscription")

def callback(message):
    print(f"Received message: {message.data}")
    republish_message(message)
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

/// BANGBOO BLOG /// - GCP runs off functions pubsub on scheduler

Posted by funa : 12:00 AM | Web | Comment (0) | Trackback (0)