This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 248380af25b Created using Colab (#34713)
248380af25b is described below
commit 248380af25bfef9cdd54b67c88977d00796655fc
Author: claudevdm <[email protected]>
AuthorDate: Tue Apr 22 19:53:50 2025 -0400
Created using Colab (#34713)
---
.../alloydb_product_catalog_embeddings.ipynb | 669 ++++++++++++---------
1 file changed, 393 insertions(+), 276 deletions(-)
diff --git
a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
index 77e811ab572..9df6929c5d9 100644
--- a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
+++ b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
@@ -106,8 +106,8 @@
"# Setup and Prerequisites\n",
"\n",
"This example requires:\n",
- "1. An AlloyDB instance with pgvector extension enabled\n",
- "2. Apache Beam 2.63.0 or later"
+ "1. An AlloyDB instance with pgvector extension and PUBLIC IP
enabled\n",
+ "2. Apache Beam 2.64.0 or later"
]
},
{
@@ -130,7 +130,7 @@
"outputs": [],
"source": [
"# Apache Beam with GCP support\n",
- "!pip install apache_beam[gcp]>=v2.63.0 --quiet\n",
+ "!pip install apache_beam[gcp]>=v2.64.0 --quiet\n",
"# Huggingface sentence-transformers for embedding models\n",
"!pip install sentence-transformers --quiet"
]
@@ -141,7 +141,7 @@
"id": "4aqYZ_pG1oYb"
},
"source": [
- "Next, let's install psycopg2-binary to help set up our test database."
+ "Next, let's install google-cloud-alloydb-connector to help set up our
test database."
]
},
{
@@ -152,7 +152,7 @@
},
"outputs": [],
"source": [
- "!pip install psycopg2-binary --quiet"
+ "!pip install \"google-cloud-alloydb-connector[pg8000]\" sqlalchemy"
]
},
{
@@ -164,9 +164,10 @@
"## Database Setup\n",
"\n",
"To connect to AlloyDB, you'll need:\n",
- "1. The JDBC connection URL\n",
- "2. Database credentials\n",
- "3. The pgvector extension enabled in your database\n",
+ "1. GCP project ID where the AlloyDB instance is located\n",
+ "2. The AlloyDB instance URI\n",
+ "3. Database credentials\n",
+ "4. The pgvector extension enabled in your database\n",
"\n",
"Replace these placeholder values with your actual AlloyDB connection
details:"
]
@@ -179,37 +180,39 @@
},
"outputs": [],
"source": [
- "ALLOYDB_HOST = \"\" # @param {type:'string'}\n",
+ "PROJECT_ID = \"\" # @param {type:'string'}\n",
+ "\n",
+ "INSTANCE_URI = \"\" # @param {type:'string'}\n",
+ "\n",
"DB_NAME = \"postgres\" # @param {type:'string'}\n",
- "JDBC_URL = f\"jdbc:postgresql://{ALLOYDB_HOST}:5432/{DB_NAME}\"\n",
+ "\n",
"DB_USER = \"postgres\" # @param {type:'string'}\n",
+ "\n",
"DB_PASSWORD = \"\" # @param {type:'string'}"
]
},
{
"cell_type": "markdown",
- "metadata": {
- "id": "VoURfADof7mO"
- },
"source": [
- "### To connect from Colab to AlloyDB, you'll need one of the
following:\n",
- "1. Public IP: Enable public IP on your AlloyDB instance and add your
Colab IP to the authorized networks\n",
- "2. Auth Proxy: Use the Cloud SQL Auth Proxy to establish a secure
connection\n",
- "3. VPC: Run this notebook on a Compute Engine VM in the same VPC as
your AlloyDB instance\n",
+ "## Authenticate to Google Cloud\n",
"\n",
- "Your current IP address (for configuring AlloyDB authorized networks
if using public IP):\n"
- ]
+ "To connect to the AlloyDB instance via the language conenctor, we
authenticate with Google Cloud."
+ ],
+ "metadata": {
+ "id": "doK840yZZNdl"
+ }
},
{
"cell_type": "code",
- "execution_count": null,
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user(project_id=PROJECT_ID)"
+ ],
"metadata": {
- "id": "qIdPWsOOsn9b"
+ "id": "CLM12rbiZHTN"
},
- "outputs": [],
- "source": [
- "!curl ifconfig.me"
- ]
+ "execution_count": null,
+ "outputs": []
},
{
"cell_type": "code",
@@ -220,163 +223,207 @@
},
"outputs": [],
"source": [
- "#@title Postgres helpers for creating tables and verifying data\n",
- "import psycopg2\n",
- "from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\n",
+ "# @title SQLAlchemy + AlloyDB Connector helpers for creating tables
and verifying data\n",
+ "\n",
+ "import sqlalchemy\n",
+ "from sqlalchemy import text # Import text construct explicitly\n",
+ "from sqlalchemy.exc import SQLAlchemyError # Import specific
exception type\n",
+ "from google.cloud.alloydb.connector import Connector\n",
+ "\n",
+ "def get_alloydb_engine(instance_uri: str, user: str, password: str,
db: str, **connect_kwargs) -> sqlalchemy.engine.Engine:\n",
+ " \"\"\"Creates a SQLAlchemy engine configured for
AlloyDB.\"\"\"\n",
+ " connector = Connector()\n",
+ " connect_kwargs.setdefault('ip_type', 'PUBLIC')\n",
+ " def get_conn() -> sqlalchemy.engine.base.Connection:\n",
+ " conn = connector.connect(\n",
+ " instance_uri,\n",
+ " \"pg8000\",\n",
+ " user=user,\n",
+ " password=password,\n",
+ " db=db,\n",
+ " **connect_kwargs # Pass additional options like
ip_type='PUBLIC' if needed\n",
+ " )\n",
+ " return conn\n",
"\n",
- "def setup_alloydb_table(host: str,\n",
- " database: str,\n",
- " table_name: str,\n",
- " user: str,\n",
- " password: str,\n",
- " port: int = 5432):\n",
- " \"\"\"Set up AlloyDB table with vector extension and proper
schema.\n",
+ " # Create the SQLAlchemy engine using the connection function\n",
+ " engine = sqlalchemy.create_engine(\n",
+ " \"postgresql+pg8000://\",\n",
+ " creator=get_conn,\n",
+ " )\n",
+ " engine.pool.dispose = lambda: connector.close()\n",
+ "\n",
+ " return engine\n",
+ "\n",
+ "\n",
+ "def setup_alloydb_table_sqlalchemy(instance_uri: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " table_schema: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " **connect_kwargs):\n",
+ " \"\"\"Set up AlloyDB table with vector extension and proper
schema using SQLAlchemy.\n",
"\n",
" Args:\n",
- " host: AlloyDB instance host\n",
+ " instance_uri: AlloyDB instance URI (e.g.,
projects/.../locations/.../clusters/.../instances/...)\n",
" database: Database name\n",
+ " table_name: Name of the table to create.\n",
+ " table_schema: SQL string defining the table columns (e.g.,
\"id SERIAL PRIMARY KEY, embedding VECTOR(768)\")\n",
" user: Database user\n",
" password: Database password\n",
- " port: Database port (default: 5432)\n",
+ " connect_kwargs: Additional keyword arguments passed to
connector.connect() (e.g., ip_type=\"PUBLIC\")\n",
" \"\"\"\n",
- " # Connection string\n",
- " conn_string = f\"host={host} dbname={database} user={user}
password={password} port={port}\"\n",
- "\n",
+ " engine = None\n",
" try:\n",
- " # Connect to the database\n",
- " conn = psycopg2.connect(conn_string)\n",
- " conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
- " cur = conn.cursor()\n",
- "\n",
- " print(\"Connected to AlloyDB successfully!\")\n",
- "\n",
- " # Create pgvector extension if it doesn't exist\n",
- " print(\"Creating pgvector extension...\")\n",
- " cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
- "\n",
- " # Create the product_embeddings table\n",
- " print(\"Creating table...\")\n",
- " cur.execute(f\"\"\"\n",
- " DROP TABLE IF EXISTS {table_name};\n",
- " \"\"\")\n",
- " cur.execute(f\"\"\"\n",
- " CREATE TABLE IF NOT EXISTS {table_name} (\n",
- " {table_schema}\n",
- " );\n",
- " \"\"\")\n",
- "\n",
- " print(\"Setup completed successfully!\")\n",
- "\n",
+ " engine = get_alloydb_engine(instance_uri, user, password,
database, **connect_kwargs)\n",
+ "\n",
+ " # Use a connection from the pool\n",
+ " with engine.connect() as connection:\n",
+ " # Use execution options for autocommit for DDL
statements\n",
+ " # Alternatively, execute outside an explicit transaction
block (begin())\n",
+ " with
connection.execution_options(isolation_level=\"AUTOCOMMIT\"):\n",
+ " print(\"Connected to AlloyDB successfully via
SQLAlchemy!\")\n",
+ "\n",
+ " # Create pgvector extension if it doesn't exist\n",
+ " print(\"Creating pgvector extension...\")\n",
+ " connection.execute(text(\"CREATE EXTENSION IF NOT
EXISTS vector;\"))\n",
+ "\n",
+ " # Drop the table if it exists\n",
+ " print(f\"Dropping table {table_name} if
exists...\")\n",
+ " # Use f-string for table name (generally okay for DDL
if source is trusted)\n",
+ " connection.execute(text(f\"DROP TABLE IF EXISTS
{table_name};\"))\n",
+ "\n",
+ " # Create the table\n",
+ " print(f\"Creating table {table_name}...\")\n",
+ " # Use f-string for table name and schema (validate
input if necessary)\n",
+ " create_sql = f\"\"\"\n",
+ " CREATE TABLE IF NOT EXISTS {table_name} (\n",
+ " {table_schema}\n",
+ " );\n",
+ " \"\"\"\n",
+ " connection.execute(text(create_sql))\n",
+ "\n",
+ " # Optional: Commit if not using autocommit (SQLAlchemy >=
2.0 often commits implicitly)\n",
+ " # connection.commit() # Usually not needed with
autocommit or implicit commit behavior\n",
+ "\n",
+ " print(\"Setup completed successfully using SQLAlchemy!\")\n",
+ "\n",
+ " except SQLAlchemyError as e:\n",
+ " print(f\"An SQLAlchemy error occurred during setup: {e}\")\n",
" except Exception as e:\n",
- " print(f\"An error occurred: {e}\")\n",
+ " print(f\"An unexpected error occurred during setup: {e}\")\n",
" finally:\n",
- " if 'cur' in locals():\n",
- " cur.close()\n",
- " if 'conn' in locals():\n",
- " conn.close()\n",
- "\n",
- "# Example usage (user will need to provide their actual connection
details)\n",
- "\"\"\"\n",
- "To set up your AlloyDB table, run the following with your connection
details:\n",
- "\n",
- "setup_alloydb_table(\n",
- " host=\"your-alloydb-host\",\n",
- " database=\"your-database\",\n",
- " user=\"your-username\",\n",
- " password=\"your-password\"\n",
- ")\n",
- "\"\"\"\n",
- "\n",
- "\"\"\"### Test the Connection and Table Setup\n",
- "\n",
- "You can verify the setup with a simple query:\n",
- "\"\"\"\n",
- "\n",
- "def test_alloydb_connection(host: str,\n",
- " database: str,\n",
- " table_name: str,\n",
- " user: str,\n",
- " password: str,\n",
- " port: int = 5432):\n",
- " \"\"\"Test the AlloyDB connection and verify table creation.\n",
- "\n",
- " Args:\n",
- " host: AlloyDB instance host\n",
+ " if engine:\n",
+ " engine.dispose() # Close connection pool and connector\n",
+ "\n",
+ "def test_alloydb_connection_sqlalchemy(instance_uri: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " **connect_kwargs):\n",
+ " \"\"\"Test the AlloyDB connection and verify table/extension
using SQLAlchemy.\n",
+ "\n",
+ " Args:\n",
+ " instance_uri: AlloyDB instance URI\n",
" database: Database name\n",
+ " table_name: Name of the table to check.\n",
" user: Database user\n",
" password: Database password\n",
- " port: Database port (default: 5432)\n",
+ " connect_kwargs: Additional keyword arguments passed to
connector.connect()\n",
" \"\"\"\n",
- " conn_string = f\"host={host} dbname={database} user={user}
password={password} port={port}\"\n",
- "\n",
+ " engine = None\n",
" try:\n",
- " conn = psycopg2.connect(conn_string)\n",
- " cur = conn.cursor()\n",
- "\n",
- " # Check if table exists\n",
- " cur.execute(f\"\"\"\n",
- " SELECT EXISTS (\n",
- " SELECT FROM information_schema.tables\n",
- " WHERE table_name = '{table_name}'\n",
- " );\n",
- " \"\"\")\n",
- " table_exists = cur.fetchone()[0]\n",
- "\n",
- " if table_exists:\n",
- " print(f\"✓ {table_name} table exists\")\n",
- "\n",
- " # Check if vector extension is installed\n",
- " cur.execute(\"SELECT * FROM pg_extension WHERE extname =
'vector';\")\n",
- " if cur.fetchone():\n",
- " print(\"✓ pgvector extension is installed\")\n",
+ " engine = get_alloydb_engine(instance_uri, user, password,
database, **connect_kwargs)\n",
+ "\n",
+ " with engine.connect() as connection:\n",
+ " print(\"Testing connection...\")\n",
+ " # Simple query to confirm connection\n",
+ " connection.execute(text(\"SELECT 1\"))\n",
+ " print(\"✓ Connection successful\")\n",
+ "\n",
+ " # Check if table exists using information_schema\n",
+ " # Use bind parameters (:tname) for safety, even though
it's a table name here\n",
+ " table_exists_query = text(\"\"\"\n",
+ " SELECT EXISTS (\n",
+ " SELECT FROM information_schema.tables\n",
+ " WHERE table_schema = 'public' AND table_name =
:tname\n",
+ " );\n",
+ " \"\"\")\n",
+ " # .scalar() fetches the first column of the first row\n",
+ " table_exists = connection.execute(table_exists_query,
{\"tname\": table_name}).scalar()\n",
+ "\n",
+ " if table_exists:\n",
+ " print(f\"✓ '{table_name}' table exists\")\n",
+ "\n",
+ " # Check if vector extension is installed\n",
+ " ext_exists_query = text(\"\"\"\n",
+ " SELECT EXISTS (\n",
+ " SELECT FROM pg_extension WHERE extname =
'vector'\n",
+ " );\n",
+ " \"\"\")\n",
+ " vector_installed =
connection.execute(ext_exists_query).scalar()\n",
+ "\n",
+ " if vector_installed:\n",
+ " print(\"✓ pgvector extension is installed\")\n",
+ " else:\n",
+ " print(\"✗ pgvector extension is NOT
installed\")\n",
" else:\n",
- " print(\"✗ pgvector extension is not installed\")\n",
- " else:\n",
- " print(f\"✗ {table_name} table does not exist\")\n",
+ " print(f\"✗ '{table_name}' table does NOT exist\")\n",
"\n",
+ " except SQLAlchemyError as e:\n",
+ " print(f\"Connection test failed (SQLAlchemy error): {e}\")\n",
" except Exception as e:\n",
- " print(f\"Connection test failed: {e}\")\n",
+ " print(f\"Connection test failed (Unexpected error): {e}\")\n",
" finally:\n",
- " if 'cur' in locals():\n",
- " cur.close()\n",
- " if 'conn' in locals():\n",
- " conn.close()\n",
- "\n",
- "def verify_embeddings(host: str,\n",
- " database: str,\n",
- " table_name: str,\n",
- " user: str,\n",
- " password: str,\n",
- " port: int = 5432):\n",
- " \"\"\"Connect to AlloyDB and print all written products.\"\"\"\n",
- " conn = psycopg2.connect(\n",
- " host=host,\n",
- " database=database,\n",
- " user=user,\n",
- " password=password,\n",
- " port=port\n",
- " )\n",
- "\n",
+ " if engine:\n",
+ " engine.dispose()\n",
+ "\n",
+ "def verify_embeddings_sqlalchemy(instance_uri: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " **connect_kwargs):\n",
+ " \"\"\"Connect to AlloyDB using SQLAlchemy and print all rows from
the table.\"\"\"\n",
+ " engine = None\n",
" try:\n",
- " with conn.cursor() as cur:\n",
- " # Simple SELECT * query\n",
- " cur.execute(f\"SELECT * FROM {table_name};\")\n",
- " rows = cur.fetchall()\n",
+ " engine = get_alloydb_engine(instance_uri, user, password,
database, **connect_kwargs)\n",
+ "\n",
+ " with engine.connect() as connection:\n",
+ " # Use f-string for table name in SELECT (ensure
table_name is controlled)\n",
+ " select_query = text(f\"SELECT * FROM {table_name};\")\n",
+ " result = connection.execute(select_query)\n",
"\n",
- " # Get column names from cursor description\n",
- " columns = [desc[0] for desc in cur.description]\n",
+ " # Get column names from the result keys\n",
+ " columns = result.keys()\n",
+ " # Fetch all rows as mapping objects (dict-like)\n",
+ " rows = result.mappings().all()\n",
"\n",
- " print(f\"\\nFound {len(rows)} products:\")\n",
+ " print(f\"\\nFound {len(rows)} products in
'{table_name}':\")\n",
" print(\"-\" * 80)\n",
"\n",
- " # Print each row with column names\n",
- " for row in rows:\n",
- " for col, val in zip(columns, row):\n",
- " print(f\"{col}: {val}\")\n",
+ " if not rows:\n",
+ " print(\"Table is empty.\")\n",
" print(\"-\" * 80)\n",
- "\n",
+ " else:\n",
+ " # Print each row\n",
+ " for row in rows:\n",
+ " for col in columns:\n",
+ " print(f\"{col}: {row[col]}\")\n",
+ " print(\"-\" * 80)\n",
+ "\n",
+ " except SQLAlchemyError as e:\n",
+ " print(f\"Failed to verify embeddings (SQLAlchemy error):
{e}\")\n",
+ " # You might want to check specifically for ProgrammingError
if the table doesn't exist\n",
+ " # from sqlalchemy.exc import ProgrammingError\n",
+ " # except ProgrammingError as pe:\n",
+ " # print(f\"Failed to query table '{table_name}'. Does it
exist? Error: {pe}\")\n",
+ " except Exception as e:\n",
+ " print(f\"Failed to verify embeddings (Unexpected error):
{e}\")\n",
" finally:\n",
- " conn.close()"
+ " if engine:\n",
+ " engine.dispose()"
]
},
{
@@ -489,7 +536,8 @@
"We import the following for configuring our embedding ingestion
pipeline:\n",
"- `Chunk`, the structured input for generating and ingesting
embeddings\n",
"- `AlloyDBConnectionConfig` for configuring database connection
information\n",
- "- `AlloyDBVectorWriterConfig` for configuring write behavior like
schema mapping and conflict resolution"
+ "- `AlloyDBVectorWriterConfig` for configuring write behavior like
schema mapping and conflict resolution\n",
+ "- `AlloyDBLanguageConnectorConfig` to connect using the [AlloyDB
language
connector](https://cloud.google.com/alloydb/docs/connect-language-connectors)"
]
},
{
@@ -503,7 +551,8 @@
"# Embedding-specific imports\n",
"from apache_beam.ml.rag.ingestion.alloydb import (\n",
" AlloyDBVectorWriterConfig,\n",
- " AlloyDBConnectionConfig\n",
+ " AlloyDBConnectionConfig,\n",
+ " AlloyDBLanguageConnectorConfig\n",
")\n",
"from apache_beam.ml.rag.ingestion.base import
VectorDatabaseWriteTransform\n",
"from apache_beam.ml.rag.types import Chunk, Content\n",
@@ -602,8 +651,8 @@
" content text,\n",
" metadata JSONB\n",
"\"\"\"\n",
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
]
},
{
@@ -723,8 +772,18 @@
},
"outputs": [],
"source": [
+ "# Configure the language connector so we can connect securly\n",
+ "language_connector_config = AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME, instance_name=INSTANCE_URI,
ip_type=\"PUBLIC\"\n",
+ ")\n",
+ "# Configure the AlloyDBConnectionConfig with language connector\n",
+ "connection_config =
AlloyDBConnectionConfig.with_language_connector(\n",
+ " connector_options=language_connector_config,\n",
+ " username=DB_USER,\n",
+ " password=DB_PASSWORD\n",
+ ")\n",
"alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
- " connection_config = AlloyDBConnectionConfig(JDBC_URL, DB_USER,
DB_PASSWORD),\n",
+ " connection_config=connection_config,\n",
" table_name=table_name\n",
")"
]
@@ -786,7 +845,7 @@
},
"outputs": [],
"source": [
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -848,8 +907,8 @@
" content text,\n",
" metadata JSONB\n",
"\"\"\"\n",
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
]
},
{
@@ -882,7 +941,7 @@
"from apache_beam.ml.transforms.base import MLTransform\n",
"from apache_beam.ml.rag.types import Chunk, Content\n",
"from apache_beam.ml.rag.ingestion.base import
VectorDatabaseWriteTransform\n",
- "from apache_beam.ml.rag.ingestion.alloydb import
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig\n",
+ "from apache_beam.ml.rag.ingestion.alloydb import
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig,
AlloyDBLanguageConnectorConfig\n",
"from apache_beam.ml.rag.embeddings.huggingface import
HuggingfaceTextEmbeddings\n",
"from apache_beam.options.pipeline_options import SetupOptions\n",
"\n",
@@ -914,9 +973,9 @@
"def run(argv=None):\n",
" parser = argparse.ArgumentParser()\n",
" parser.add_argument(\n",
- " '--alloydb_host',\n",
+ " '--instance_uri',\n",
" required=True,\n",
- " help='AlloyDB host'\n",
+ " help='AlloyDB instance uri'\n",
" )\n",
" parser.add_argument(\n",
" '--alloydb_database',\n",
@@ -961,8 +1020,10 @@
" )\n",
" | 'Write to AlloyDB' >>
VectorDatabaseWriteTransform(\n",
" AlloyDBVectorWriterConfig(\n",
- " connection_config=AlloyDBConnectionConfig(\n",
- "
jdbc_url=f'jdbc:postgresql://{known_args.alloydb_host}/{known_args.alloydb_database}',\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ " AlloyDBLanguageConnectorConfig(\n",
+ "
database_name=known_args.alloydb_database,
instance_name=known_args.instance_uri\n",
+ " ),\n",
" username=known_args.alloydb_username,\n",
" password=known_args.alloydb_password\n",
" ),\n",
@@ -1047,19 +1108,19 @@
},
"outputs": [],
"source": [
+ "import os\n",
"BUCKET_NAME = '' # @param {type:'string'}\n",
"REGION = 'us-central1' # @param {type:'string'}\n",
"os.environ['BUCKET_NAME'] = BUCKET_NAME\n",
"os.environ['REGION'] = REGION\n",
"\n",
"# Save AlloyDB credentioals to environment variables\n",
- "PRIVATE_ALLOYDB_HOST = '' # @param {type:'string'}\n",
- "os.environ['PRIVATE_ALLOYDB_HOST'] = PRIVATE_ALLOYDB_HOST\n",
+ "os.environ['INSTANCE_URI'] = INSTANCE_URI\n",
"os.environ['DATABASE_NAME'] = DB_NAME\n",
"os.environ['ALLOYDB_USER'] = DB_USER\n",
"os.environ['ALLOYDB_PASSWORD'] = DB_PASSWORD\n",
"\n",
- "NETWORK = '' # @param {type:'string'}\n",
+ "NETWORK = 'default' # @param {type:'string'}\n",
"SUBNETWORK = '' # @param {type:'string'}\n",
"os.environ['NETWORK'] = NETWORK\n",
"os.environ['SUBNETWORK'] = SUBNETWORK"
@@ -1126,7 +1187,7 @@
"!python ./basic_ingestion_pipeline.py \\\n",
" --project=$PROJECT_ID \\\n",
" --alloydb_username=$ALLOYDB_USER \\\n",
- " --alloydb_host=${PRIVATE_ALLOYDB_HOST}:5432 \\\n",
+ " --instance_uri=$INSTANCE_URI \\\n",
" --alloydb_password=$ALLOYDB_PASSWORD \\\n",
" --alloydb_table=default_dataflow_product_embeddings \\\n",
" --alloydb_database=$DATABASE_NAME \\\n",
@@ -1159,7 +1220,7 @@
},
"outputs": [],
"source": [
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name='default_dataflow_product_embeddings',
user=DB_USER, password=DB_PASSWORD)\n"
]
},
{
@@ -1286,8 +1347,8 @@
" model_name VARCHAR,\n",
" created_at TIMESTAMP\n",
"\"\"\"\n",
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
]
},
{
@@ -1432,7 +1493,15 @@
"
.with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n",
" | 'Write to AlloyDB' >> VectorDatabaseWriteTransform(\n",
" AlloyDBVectorWriterConfig(\n",
- " connection_config =
AlloyDBConnectionConfig(JDBC_URL, DB_USER, DB_PASSWORD),\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ "
connector_options=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI,\n",
+ " ip_type=\"PUBLIC\"\n",
+ " ),\n",
+ " username=DB_USER,\n",
+ " password=DB_PASSWORD\n",
+ " ),\n",
" table_name=table_name,\n",
" column_specs=column_specs\n",
" )\n",
@@ -1459,7 +1528,7 @@
},
"outputs": [],
"source": [
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -1500,8 +1569,8 @@
" metadata JSONB,\n",
" created_at TIMESTAMP NOT NULL DEFAULT NOW()\n",
"\"\"\"\n",
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
]
},
{
@@ -1588,7 +1657,15 @@
"\n",
"# Create writer config with conflict resolution\n",
"alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
- " connection_config=AlloyDBConnectionConfig(JDBC_URL, DB_USER,
DB_PASSWORD),\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ " connector_options=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI,\n",
+ " ip_type=\"PUBLIC\"\n",
+ " ),\n",
+ " username=DB_USER,\n",
+ " password=DB_PASSWORD\n",
+ " ),\n",
" table_name=table_name,\n",
" conflict_resolution=conflict_resolution,\n",
")"
@@ -1665,7 +1742,7 @@
"outputs": [],
"source": [
"print(\"\\nAfter Day 1 ingestion:\")\n",
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -1726,7 +1803,7 @@
"outputs": [],
"source": [
"print(\"\\nAfter Day 2 ingestion:\")\n",
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -1793,67 +1870,98 @@
"outputs": [],
"source": [
"#@title Postgres helpers for inserting initial records\n",
- "def setup_initial_data(host: str,\n",
- " database: str,\n",
- " table_name: str,\n",
- " user: str,\n",
- " password: str,\n",
- " port: int = 5432):\n",
- " \"\"\"Set up table and insert sample product data.\"\"\"\n",
- " conn_string = f\"host={host} dbname={database} user={user}
password={password} port={port}\"\n",
+ "import sqlalchemy\n",
+ "from sqlalchemy import text\n",
+ "from sqlalchemy.exc import SQLAlchemyError\n",
+ "from google.cloud.alloydb.connector import Connector\n",
+ "\n",
+ "def setup_initial_data_sqlalchemy(instance_uri: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " table_schema: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " **connect_kwargs):\n",
+ " \"\"\"Set up table and insert sample product data using
SQLAlchemy.\n",
+ "\n",
+ " (Revised to handle potential connection closing issue after
DDL)\n",
"\n",
+ " Args:\n",
+ " instance_uri: AlloyDB instance URI\n",
+ " database: Database name\n",
+ " table_name: Name of the table to create and populate.\n",
+ " table_schema: SQL string defining the table columns.\n",
+ " user: Database user\n",
+ " password: Database password\n",
+ " connect_kwargs: Additional keyword arguments for
connector.connect().\n",
+ " \"\"\"\n",
+ " engine = None\n",
" try:\n",
- " conn = psycopg2.connect(conn_string)\n",
- " conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
- " cur = conn.cursor()\n",
- "\n",
- " # Create pgvector extension\n",
- " cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
- "\n",
- " # Create products table\n",
- " cur.execute(f\"DROP TABLE IF EXISTS {table_name};\")\n",
- " cur.execute(f\"CREATE TABLE {table_name}
({table_schema});\")\n",
- "\n",
- " # Insert sample data\n",
- " sample_products = [\n",
- " (\n",
- " \"lamp-001\",\n",
- " \"Artisan Table Lamp\",\n",
- " \"Hand-crafted ceramic table lamp with linen shade.
\"\n",
- " \"Features dimmable LED bulb and touch-sensitive
base.\",\n",
- " 129.99\n",
- " ),\n",
- " (\n",
- " \"mirror-001\",\n",
- " \"Floating Wall Mirror\",\n",
- " \"Modern circular mirror with hidden mounting system.
\"\n",
- " \"Perfect for entryways and contemporary
spaces.\",\n",
- " 199.99\n",
- " ),\n",
- " (\n",
- " \"vase-001\",\n",
- " \"Contemporary Ceramic Vase\",\n",
- " \"Minimalist vase with matte finish. \"\n",
- " \"Ideal for dried or fresh flower arrangements.\",\n",
- " 79.99\n",
- " )\n",
- " ]\n",
- "\n",
- " cur.executemany(\n",
- " f\"\"\"\n",
- " INSERT INTO {table_name} (id, title, description,
price)\n",
- " VALUES (%s, %s, %s, %s)\n",
- " \"\"\",\n",
- " sample_products\n",
- " )\n",
- "\n",
- " print(\"✓ Sample products inserted successfully\")\n",
- "\n",
+ " engine = get_alloydb_engine(instance_uri, user, password,
database, **connect_kwargs)\n",
+ "\n",
+ " # Use a single connection for both DDL and DML\n",
+ " with engine.connect() as connection:\n",
+ " print(\"Connected to AlloyDB successfully via
SQLAlchemy!\")\n",
+ "\n",
+ " # === DDL Operations (Relying on implicit autocommit for
DDL) ===\n",
+ " # Execute DDL directly on the connection outside an
explicit transaction.\n",
+ " # SQLAlchemy + Postgres drivers usually handle this
correctly.\n",
+ "\n",
+ " print(\"Ensuring pgvector extension exists...\")\n",
+ " connection.execute(text(\"CREATE EXTENSION IF NOT EXISTS
vector;\"))\n",
+ "\n",
+ " print(f\"Dropping table {table_name} if exists...\")\n",
+ " connection.execute(text(f\"DROP TABLE IF EXISTS
{table_name};\"))\n",
+ "\n",
+ " print(f\"Creating table {table_name}...\")\n",
+ " create_sql = f\"CREATE TABLE {table_name}
({table_schema});\"\n",
+ " connection.execute(text(create_sql))\n",
+ " print(f\"Table {table_name} created.\")\n",
+ "\n",
+ " # === DML Operations (Runs in default transaction started
by connect()) ===\n",
+ " sample_products_dicts = [\n",
+ " # (Sample data dictionaries as defined in the
previous version)\n",
+ " {\n",
+ " \"id\": \"lamp-001\", \"title\": \"Artisan Table
Lamp\",\n",
+ " \"description\": \"Hand-crafted ceramic...\",
\"price\": 129.99\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"mirror-001\", \"title\": \"Floating
Wall Mirror\",\n",
+ " \"description\": \"Modern circular mirror...\",
\"price\": 199.99\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"vase-001\", \"title\": \"Contemporary
Ceramic Vase\",\n",
+ " \"description\": \"Minimalist vase...\",
\"price\": 79.99\n",
+ " }\n",
+ " # Add embedding data if needed\n",
+ " ]\n",
+ "\n",
+ " insert_sql = text(f\"\"\"\n",
+ " INSERT INTO {table_name} (id, title, description,
price)\n",
+ " VALUES (:id, :title, :description, :price)\n",
+ " \"\"\") # Add other columns if needed\n",
+ "\n",
+ " print(f\"Inserting sample data into {table_name}...\")\n",
+ " # Execute DML within the connection's transaction\n",
+ " connection.execute(insert_sql, sample_products_dicts)\n",
+ "\n",
+ " # Commit the transaction containing the INSERTs\n",
+ " print(\"Committing transaction...\")\n",
+ " connection.commit()\n",
+ " print(\"✓ Sample products inserted successfully\")\n",
+ "\n",
+ " print(\"Initial data setup completed successfully using
SQLAlchemy!\")\n",
+ "\n",
+ " except SQLAlchemyError as e:\n",
+ " print(f\"An SQLAlchemy error occurred during initial data
setup: {e}\")\n",
+ " # Note: If an error occurs *before* commit, the transaction
is usually\n",
+ " # rolled back automatically when the 'with engine.connect()'
block exits.\n",
+ " except Exception as e:\n",
+ " print(f\"An unexpected error occurred during initial data
setup: {e}\")\n",
" finally:\n",
- " if 'cur' in locals():\n",
- " cur.close()\n",
- " if 'conn' in locals():\n",
- " conn.close()"
+ " if engine:\n",
+ " print(\"Disposing engine pool...\")\n",
+ " engine.dispose()"
]
},
{
@@ -1864,7 +1972,7 @@
},
"outputs": [],
"source": [
- "setup_initial_data(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_initial_data_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
table_schema, DB_USER, DB_PASSWORD)"
]
},
{
@@ -1892,7 +2000,15 @@
"\n",
"# Configure database writer\n",
"alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
- " connection_config=AlloyDBConnectionConfig(JDBC_URL, DB_USER,
DB_PASSWORD),\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ " connector_options=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI,\n",
+ " ip_type=\"PUBLIC\"\n",
+ " ),\n",
+ " username=DB_USER,\n",
+ " password=DB_PASSWORD\n",
+ " ),\n",
" table_name=table_name,\n",
" column_specs=(\n",
" ColumnSpecsBuilder()\n",
@@ -1922,10 +2038,18 @@
" | \"Read Products\" >> ReadFromJdbc(\n",
" table_name=table_name,\n",
" driver_class_name=\"org.postgresql.Driver\",\n",
- " jdbc_url=JDBC_URL,\n",
+ " jdbc_url=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI,\n",
+ " ip_type=\"PUBLIC\"\n",
+ " ).to_jdbc_url(),\n",
" username=DB_USER,\n",
" password=DB_PASSWORD,\n",
- " query=f\"SELECT id, title, description FROM
{table_name}\"\n",
+ " query=f\"SELECT id, title, description FROM
{table_name}\",\n",
+ " classpath=[\n",
+ " \"org.postgresql:postgresql:42.2.16\",\n",
+ " \"com.google.cloud:alloydb-jdbc-connector:1.2.0\"\n",
+ " ]\n",
" )\n",
" )\n",
"\n",
@@ -1965,7 +2089,7 @@
"outputs": [],
"source": [
"print(\"\\nAfter embedding generation:\")\n",
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -2063,8 +2187,8 @@
" content text,\n",
" metadata JSONB\n",
"\"\"\"\n",
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
]
},
{
@@ -2088,7 +2212,7 @@
"source": [
"from apache_beam.ml.rag.embeddings.vertex_ai import
VertexAITextEmbeddings\n",
"\n",
- "vertexai_embedder =
VertexAITextEmbeddings(model_name=\"textembedding-gecko@latest\")"
+ "vertexai_embedder =
VertexAITextEmbeddings(model_name=\"text-embedding-005\")"
]
},
{
@@ -2129,8 +2253,12 @@
" )\n",
" | 'Write to AlloyDB' >>
VectorDatabaseWriteTransform(\n",
" AlloyDBVectorWriterConfig(\n",
- " connection_config=AlloyDBConnectionConfig(\n",
- " jdbc_url=JDBC_URL,\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ "
connector_options=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI,\n",
+ " ip_type=\"PUBLIC\"\n",
+ " ),\n",
" username=DB_USER,\n",
" password=DB_PASSWORD\n",
" ),\n",
@@ -2158,7 +2286,7 @@
"outputs": [],
"source": [
"print(\"\\nAfter embedding generation:\")\n",
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
]
},
{
@@ -2210,9 +2338,8 @@
"from google.api_core.exceptions import AlreadyExists\n",
"import json\n",
"\n",
- "# Define topic and subscription names\n",
+ "# Define pubsub topic\n",
"TOPIC = \"product-updates\" # @param {type:'string'}\n",
- "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
"\n",
"# Create publisher client and topic\n",
"publisher = pubsub_v1.PublisherClient()\n",
@@ -2221,18 +2348,7 @@
" topic = publisher.create_topic(request={\"name\": topic_path})\n",
" print(f\"Created topic: {topic.name}\")\n",
"except AlreadyExists:\n",
- " print(f\"Topic {topic_path} already exists.\")\n",
- "\n",
- "# Create subscriber client and subscription\n",
- "subscriber = pubsub_v1.SubscriberClient()\n",
- "subscription_path = subscriber.subscription_path(PROJECT_ID,
SUBSCRIPTION)\n",
- "try:\n",
- " subscription = subscriber.create_subscription(\n",
- " request={\"name\": subscription_path, \"topic\":
topic_path}\n",
- " )\n",
- " print(f\"Created subscription: {subscription.name}\")\n",
- "except AlreadyExists:\n",
- " print(f\"Subscription {subscription_path} already exists.\")"
+ " print(f\"Topic {topic_path} already exists.\")"
],
"metadata": {
"id": "nqMe0Brlt7Bk"
@@ -2260,7 +2376,7 @@
" embedding VECTOR(384) NOT NULL,\n",
" content text,\n",
" metadata JSONB,\n",
- " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+ " created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
"\"\"\""
],
"metadata": {
@@ -2272,8 +2388,8 @@
{
"cell_type": "code",
"source": [
- "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
- "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME,
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+ "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,
DB_USER, DB_PASSWORD)"
],
"metadata": {
"id": "8HPhUfAuorBP"
@@ -2301,8 +2417,6 @@
"source": [
"from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions, SetupOptions, GoogleCloudOptions, WorkerOptions\n",
"\n",
- "PRIVATE_ALLOYDB_HOST = '' # @param {type:'string'}\n",
- "\n",
"options = PipelineOptions()\n",
"options.view_as(StandardOptions).streaming = True\n",
"\n",
@@ -2315,7 +2429,7 @@
"\n",
"# The VPC network to run your Dataflow job in.\n",
"# Should be the same as the AlloyDB network if using Private services
access.\n",
- "NETWORK = '' # @param {type:'string'}\n",
+ "NETWORK = 'default' # @param {type:'string'}\n",
"options.view_as(WorkerOptions).network = NETWORK\n",
"\n",
"# The VPC subnetwork to run your Dataflow job in.\n",
@@ -2423,7 +2537,7 @@
"_ = (\n",
" pipeline\n",
" | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n",
- "
subscription=f\"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION}\"\n",
+ " topic=f\"projects/{PROJECT_ID}/topics/{TOPIC}\"\n",
" )\n",
" | \"Window\" >> beam.WindowInto(FixedWindows(10))\n",
" | \"Parse Messages\" >> beam.Map(parse_message)\n",
@@ -2431,8 +2545,11 @@
"
.with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n",
" | \"Write to AlloyDB\" >> VectorDatabaseWriteTransform(\n",
" AlloyDBVectorWriterConfig(\n",
- " connection_config=AlloyDBConnectionConfig(\n",
- "
jdbc_url=f'jdbc:postgresql://{PRIVATE_ALLOYDB_HOST}/{DB_NAME}',\n",
+ "
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+ " connector_options=AlloyDBLanguageConnectorConfig(\n",
+ " database_name=DB_NAME,\n",
+ " instance_name=INSTANCE_URI\n",
+ " ),\n",
" username=DB_USER,\n",
" password=DB_PASSWORD\n",
" ),\n",
@@ -2624,7 +2741,7 @@
"source": [
"# Verify the results\n",
"print(\"\\nAfter embedding generation:\")\n",
- "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
],
"metadata": {
"id": "zSb1UoCSznkW"