■Cloud SQL Python Connector (Cloud SQL language Connector)
CloudSQL proxyでないやり方、簡単
Cloud SQL 言語コネクタの概要 | Cloud SQL for MySQL | Google Cloud
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
事前必要 pip install Flask mysql-connector-python
import mysql.connector
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
def items():
#データベースの返りをdictで取得
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) AS total FROM item")
#単一カラムのとき
total_items = cursor.fetchone()['total']
cursor.execute("SELECT FROM item")
items = cursor.fetchall()
cursor.close()
connection.close()
↓
↓
コネクションプールを使うSQLAlchemy が良い?
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from google.cloud.sql.connector import Connector
# initialize Python Connector object
connector = Connector()
#Python Connector database connection function
def getconn():
conn = connector.connect(
"project region instance-name", # Cloud SQL Instance Connection Name
"pymysql", user="my-user", password="my-password", db="my-database",
ip_type="public" # "private" for private IP
)
return conn
app Flask(name)
#configure Flask-SQLAlchemy to use Python Connector
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://"
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {"creator": getconn}
# initialize the app with the extension
db = SQLAlchemy()
db.init_app(app)
下記のオプションも使える
connector = Connector(
ip_type="public", # can also be "private" or "psc"
enable_iam_auth=False,
timeout=30,
credentials=custom_creds, #google.auth credentials.Credentials
refresh_strategy="lazy", # can be "lazy" or "background"
)
■Cloud SQL MySQL設定
ロールは Cloud SQL 管理者 (roles/cloudsql.admin)、Cloud SQL インスタンスユーザー (roles/cloudsql.instance User)等のIAM?
【開発環境】mysql_dbso
Enterprise/Sandbox/AsiaNorthEast1 (Tokyo) / Single zone
MySQL ver 8.4
Shared core / 1cpu 0.6GB/HDD/10GB(auto increase)
PrivatelP/設定にvpcnwが必要/Enable private path
Auto daily backup 7days (1-5AM) / Enable point-in-time recovery
Week1 sun 0-1am/ Enable query insights
PW: x
【本番環境】
Enterprise plus? キャッシュ使う?
データベースフラグ (confが直接変更できなためフラグとしてパラメータを渡せる)
Cloud SQL studio (コンソールでMySQLが使える)
MySQLクライアントを使いたいならAuth proxyが必要
HA構成だとフェールオーバーやリードレプリカ等が使える
●Cloud SQLが内部IPだとサーパレスVPCコネクタ、or 外部IPならSQL+auth proxy
内部IPで良いのでVPCを作る、CloudSQLを内部IPで作る
サーバレスVPCコネクタを作る
ファイアウォールルールでポート (デフォルトで3306など)を開放
Cloud Run のNW設定で、サーバーレス VPC コネクタを選択、ルートオプションとしてすべてのトラフィックをVPC コネクタ経由で送信を選択
■対象アセットに対する付与可能なロールの一覧表示
■対象アセットに対する付与可能なロールの一覧表示
Full Resource Name(フルでのアセット名を探せる)
import google.auth
import googleapiclient.discovery
def view_grantable_roles(full_resource_name: str) -> None:
credentials.google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
service = googleapiclient.discovery.build('iam', 'v1', credentials credentials)
roles = (
service roles()
queryGrantableRoles (body=["fullResourceName": full_resource_name}).execute()
)
for role in roles["roles"]
if "title" in role:
print("Title: role["title"])
print("Name: role["name"])
if "description" in role:
print("Description:" + role["description"])
print("")
project_id = "prj"
#resource = f"//bigquery.googleapis.com/projects/prj/datasets/ds"
#resource + f"//bigquery googleapis.com/projects/prj/datasets/ds/tables/tbl"
resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"
view_grantable_roles(resource)
■ロールの一覧表示
■ロールの一覧表示
https://cloud.google.com/iam/docs/roles-overview?hl=ja#role-types
1)事前定義ロールの場合は roles.get() を使用します。
2)プロジェクトレベルのカスタムロールの場合は、projects.roles.get() を使用します。
3)組織レベルのカスタムロールの場合は、organizations.roles.get() を使用します。
これら3種類で全てを網羅すると思われます
projectIDがsys-のものはGAS、lifecycleStateがACTIVE以外のものも含まれるので注意
■bqへの書き込み
■bqへの書き込み
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your-service-account-key.json"
pip install google-cloud-bigquery
from google.cloud import bigquery
client = bigquery Client()
#書き込み先のテーブル情報
table_ref = f"{project_id}.{dataset_id}.{table_id}"
#サンプルデータの生成
def generate_sample_data(num_rows)
data = [
{
"organization": f"org_(num_rows)",
"permission". "view",
}
for _ in range(num_rows)
]
return data
data_to_insert = generate_sample_data(5000)
errors = client.insert_rows_json(table_ref, data_to_insert)
if errors:
print("Errors occurred: {errors}")
else:
print("Data successfully written to BigQuery!")