This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 248380af25b Created using Colab (#34713)
248380af25b is described below

commit 248380af25bfef9cdd54b67c88977d00796655fc
Author: claudevdm <[email protected]>
AuthorDate: Tue Apr 22 19:53:50 2025 -0400

    Created using Colab (#34713)
---
 .../alloydb_product_catalog_embeddings.ipynb       | 669 ++++++++++++---------
 1 file changed, 393 insertions(+), 276 deletions(-)

diff --git 
a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb 
b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
index 77e811ab572..9df6929c5d9 100644
--- a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
+++ b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
@@ -106,8 +106,8 @@
         "# Setup and Prerequisites\n",
         "\n",
         "This example requires:\n",
-        "1. An AlloyDB instance with pgvector extension enabled\n",
-        "2. Apache Beam 2.63.0 or later"
+        "1. An AlloyDB instance with pgvector extension and PUBLIC IP 
enabled\n",
+        "2. Apache Beam 2.64.0 or later"
       ]
     },
     {
@@ -130,7 +130,7 @@
       "outputs": [],
       "source": [
         "# Apache Beam with GCP support\n",
-        "!pip install apache_beam[gcp]>=v2.63.0 --quiet\n",
+        "!pip install apache_beam[gcp]>=v2.64.0 --quiet\n",
         "# Huggingface sentence-transformers for embedding models\n",
         "!pip install sentence-transformers --quiet"
       ]
@@ -141,7 +141,7 @@
         "id": "4aqYZ_pG1oYb"
       },
       "source": [
-        "Next, let's install psycopg2-binary to help set up our test database."
+        "Next, let's install google-cloud-alloydb-connector to help set up our 
test database."
       ]
     },
     {
@@ -152,7 +152,7 @@
       },
       "outputs": [],
       "source": [
-        "!pip install psycopg2-binary --quiet"
+        "!pip install \"google-cloud-alloydb-connector[pg8000]\" sqlalchemy"
       ]
     },
     {
@@ -164,9 +164,10 @@
         "## Database Setup\n",
         "\n",
         "To connect to AlloyDB, you'll need:\n",
-        "1. The JDBC connection URL\n",
-        "2. Database credentials\n",
-        "3. The pgvector extension enabled in your database\n",
+        "1. GCP project ID where the AlloyDB instance is located\n",
+        "2. The AlloyDB instance URI\n",
+        "3. Database credentials\n",
+        "4. The pgvector extension enabled in your database\n",
         "\n",
         "Replace these placeholder values with your actual AlloyDB connection 
details:"
       ]
@@ -179,37 +180,39 @@
       },
       "outputs": [],
       "source": [
-        "ALLOYDB_HOST = \"\" # @param {type:'string'}\n",
+        "PROJECT_ID = \"\" # @param {type:'string'}\n",
+        "\n",
+        "INSTANCE_URI = \"\" # @param {type:'string'}\n",
+        "\n",
         "DB_NAME = \"postgres\" #  @param {type:'string'}\n",
-        "JDBC_URL = f\"jdbc:postgresql://{ALLOYDB_HOST}:5432/{DB_NAME}\"\n",
+        "\n",
         "DB_USER = \"postgres\" # @param {type:'string'}\n",
+        "\n",
         "DB_PASSWORD = \"\" # @param {type:'string'}"
       ]
     },
     {
       "cell_type": "markdown",
-      "metadata": {
-        "id": "VoURfADof7mO"
-      },
       "source": [
-        "### To connect from Colab to AlloyDB, you'll need one of the 
following:\n",
-        "1. Public IP: Enable public IP on your AlloyDB instance and add your 
Colab IP to the authorized networks\n",
-        "2. Auth Proxy: Use the Cloud SQL Auth Proxy to establish a secure 
connection\n",
-        "3. VPC: Run this notebook on a Compute Engine VM in the same VPC as 
your AlloyDB instance\n",
+        "## Authenticate to Google Cloud\n",
         "\n",
-        "Your current IP address (for configuring AlloyDB authorized networks 
if using public IP):\n"
-      ]
+        "To connect to the AlloyDB instance via the language conenctor, we 
authenticate with Google Cloud."
+      ],
+      "metadata": {
+        "id": "doK840yZZNdl"
+      }
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "source": [
+        "from google.colab import auth\n",
+        "auth.authenticate_user(project_id=PROJECT_ID)"
+      ],
       "metadata": {
-        "id": "qIdPWsOOsn9b"
+        "id": "CLM12rbiZHTN"
       },
-      "outputs": [],
-      "source": [
-        "!curl ifconfig.me"
-      ]
+      "execution_count": null,
+      "outputs": []
     },
     {
       "cell_type": "code",
@@ -220,163 +223,207 @@
       },
       "outputs": [],
       "source": [
-        "#@title Postgres helpers for creating tables and verifying data\n",
-        "import psycopg2\n",
-        "from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\n",
+        "# @title SQLAlchemy + AlloyDB Connector helpers for creating tables 
and verifying data\n",
+        "\n",
+        "import sqlalchemy\n",
+        "from sqlalchemy import text # Import text construct explicitly\n",
+        "from sqlalchemy.exc import SQLAlchemyError # Import specific 
exception type\n",
+        "from google.cloud.alloydb.connector import Connector\n",
+        "\n",
+        "def get_alloydb_engine(instance_uri: str, user: str, password: str, 
db: str, **connect_kwargs) -> sqlalchemy.engine.Engine:\n",
+        "    \"\"\"Creates a SQLAlchemy engine configured for 
AlloyDB.\"\"\"\n",
+        "    connector = Connector()\n",
+        "    connect_kwargs.setdefault('ip_type', 'PUBLIC')\n",
+        "    def get_conn() -> sqlalchemy.engine.base.Connection:\n",
+        "        conn = connector.connect(\n",
+        "            instance_uri,\n",
+        "            \"pg8000\",\n",
+        "            user=user,\n",
+        "            password=password,\n",
+        "            db=db,\n",
+        "            **connect_kwargs # Pass additional options like 
ip_type='PUBLIC' if needed\n",
+        "        )\n",
+        "        return conn\n",
         "\n",
-        "def setup_alloydb_table(host: str,\n",
-        "                       database: str,\n",
-        "                      table_name: str,\n",
-        "                       user: str,\n",
-        "                       password: str,\n",
-        "                       port: int = 5432):\n",
-        "    \"\"\"Set up AlloyDB table with vector extension and proper 
schema.\n",
+        "    # Create the SQLAlchemy engine using the connection function\n",
+        "    engine = sqlalchemy.create_engine(\n",
+        "        \"postgresql+pg8000://\",\n",
+        "        creator=get_conn,\n",
+        "    )\n",
+        "    engine.pool.dispose = lambda: connector.close()\n",
+        "\n",
+        "    return engine\n",
+        "\n",
+        "\n",
+        "def setup_alloydb_table_sqlalchemy(instance_uri: str,\n",
+        "                                   database: str,\n",
+        "                                   table_name: str,\n",
+        "                                   table_schema: str,\n",
+        "                                   user: str,\n",
+        "                                   password: str,\n",
+        "                                   **connect_kwargs):\n",
+        "    \"\"\"Set up AlloyDB table with vector extension and proper 
schema using SQLAlchemy.\n",
         "\n",
         "    Args:\n",
-        "        host: AlloyDB instance host\n",
+        "        instance_uri: AlloyDB instance URI (e.g., 
projects/.../locations/.../clusters/.../instances/...)\n",
         "        database: Database name\n",
+        "        table_name: Name of the table to create.\n",
+        "        table_schema: SQL string defining the table columns (e.g., 
\"id SERIAL PRIMARY KEY, embedding VECTOR(768)\")\n",
         "        user: Database user\n",
         "        password: Database password\n",
-        "        port: Database port (default: 5432)\n",
+        "        connect_kwargs: Additional keyword arguments passed to 
connector.connect() (e.g., ip_type=\"PUBLIC\")\n",
         "    \"\"\"\n",
-        "    # Connection string\n",
-        "    conn_string = f\"host={host} dbname={database} user={user} 
password={password} port={port}\"\n",
-        "\n",
+        "    engine = None\n",
         "    try:\n",
-        "        # Connect to the database\n",
-        "        conn = psycopg2.connect(conn_string)\n",
-        "        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
-        "        cur = conn.cursor()\n",
-        "\n",
-        "        print(\"Connected to AlloyDB successfully!\")\n",
-        "\n",
-        "        # Create pgvector extension if it doesn't exist\n",
-        "        print(\"Creating pgvector extension...\")\n",
-        "        cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
-        "\n",
-        "        # Create the product_embeddings table\n",
-        "        print(\"Creating table...\")\n",
-        "        cur.execute(f\"\"\"\n",
-        "          DROP TABLE IF EXISTS {table_name};\n",
-        "        \"\"\")\n",
-        "        cur.execute(f\"\"\"\n",
-        "          CREATE TABLE IF NOT EXISTS {table_name} (\n",
-        "              {table_schema}\n",
-        "          );\n",
-        "        \"\"\")\n",
-        "\n",
-        "        print(\"Setup completed successfully!\")\n",
-        "\n",
+        "        engine = get_alloydb_engine(instance_uri, user, password, 
database, **connect_kwargs)\n",
+        "\n",
+        "        # Use a connection from the pool\n",
+        "        with engine.connect() as connection:\n",
+        "            # Use execution options for autocommit for DDL 
statements\n",
+        "            # Alternatively, execute outside an explicit transaction 
block (begin())\n",
+        "            with 
connection.execution_options(isolation_level=\"AUTOCOMMIT\"):\n",
+        "                print(\"Connected to AlloyDB successfully via 
SQLAlchemy!\")\n",
+        "\n",
+        "                # Create pgvector extension if it doesn't exist\n",
+        "                print(\"Creating pgvector extension...\")\n",
+        "                connection.execute(text(\"CREATE EXTENSION IF NOT 
EXISTS vector;\"))\n",
+        "\n",
+        "                # Drop the table if it exists\n",
+        "                print(f\"Dropping table {table_name} if 
exists...\")\n",
+        "                # Use f-string for table name (generally okay for DDL 
if source is trusted)\n",
+        "                connection.execute(text(f\"DROP TABLE IF EXISTS 
{table_name};\"))\n",
+        "\n",
+        "                # Create the table\n",
+        "                print(f\"Creating table {table_name}...\")\n",
+        "                # Use f-string for table name and schema (validate 
input if necessary)\n",
+        "                create_sql = f\"\"\"\n",
+        "                CREATE TABLE IF NOT EXISTS {table_name} (\n",
+        "                    {table_schema}\n",
+        "                );\n",
+        "                \"\"\"\n",
+        "                connection.execute(text(create_sql))\n",
+        "\n",
+        "            # Optional: Commit if not using autocommit (SQLAlchemy >= 
2.0 often commits implicitly)\n",
+        "            # connection.commit() # Usually not needed with 
autocommit or implicit commit behavior\n",
+        "\n",
+        "        print(\"Setup completed successfully using SQLAlchemy!\")\n",
+        "\n",
+        "    except SQLAlchemyError as e:\n",
+        "        print(f\"An SQLAlchemy error occurred during setup: {e}\")\n",
         "    except Exception as e:\n",
-        "        print(f\"An error occurred: {e}\")\n",
+        "        print(f\"An unexpected error occurred during setup: {e}\")\n",
         "    finally:\n",
-        "        if 'cur' in locals():\n",
-        "            cur.close()\n",
-        "        if 'conn' in locals():\n",
-        "            conn.close()\n",
-        "\n",
-        "# Example usage (user will need to provide their actual connection 
details)\n",
-        "\"\"\"\n",
-        "To set up your AlloyDB table, run the following with your connection 
details:\n",
-        "\n",
-        "setup_alloydb_table(\n",
-        "    host=\"your-alloydb-host\",\n",
-        "    database=\"your-database\",\n",
-        "    user=\"your-username\",\n",
-        "    password=\"your-password\"\n",
-        ")\n",
-        "\"\"\"\n",
-        "\n",
-        "\"\"\"### Test the Connection and Table Setup\n",
-        "\n",
-        "You can verify the setup with a simple query:\n",
-        "\"\"\"\n",
-        "\n",
-        "def test_alloydb_connection(host: str,\n",
-        "                          database: str,\n",
-        "                          table_name: str,\n",
-        "                          user: str,\n",
-        "                          password: str,\n",
-        "                          port: int = 5432):\n",
-        "    \"\"\"Test the AlloyDB connection and verify table creation.\n",
-        "\n",
-        "    Args:\n",
-        "        host: AlloyDB instance host\n",
+        "        if engine:\n",
+        "            engine.dispose() # Close connection pool and connector\n",
+        "\n",
+        "def test_alloydb_connection_sqlalchemy(instance_uri: str,\n",
+        "                                       database: str,\n",
+        "                                       table_name: str,\n",
+        "                                       user: str,\n",
+        "                                       password: str,\n",
+        "                                       **connect_kwargs):\n",
+        "    \"\"\"Test the AlloyDB connection and verify table/extension 
using SQLAlchemy.\n",
+        "\n",
+        "     Args:\n",
+        "        instance_uri: AlloyDB instance URI\n",
         "        database: Database name\n",
+        "        table_name: Name of the table to check.\n",
         "        user: Database user\n",
         "        password: Database password\n",
-        "        port: Database port (default: 5432)\n",
+        "        connect_kwargs: Additional keyword arguments passed to 
connector.connect()\n",
         "    \"\"\"\n",
-        "    conn_string = f\"host={host} dbname={database} user={user} 
password={password} port={port}\"\n",
-        "\n",
+        "    engine = None\n",
         "    try:\n",
-        "        conn = psycopg2.connect(conn_string)\n",
-        "        cur = conn.cursor()\n",
-        "\n",
-        "        # Check if table exists\n",
-        "        cur.execute(f\"\"\"\n",
-        "            SELECT EXISTS (\n",
-        "                SELECT FROM information_schema.tables\n",
-        "                WHERE table_name = '{table_name}'\n",
-        "            );\n",
-        "        \"\"\")\n",
-        "        table_exists = cur.fetchone()[0]\n",
-        "\n",
-        "        if table_exists:\n",
-        "            print(f\"✓ {table_name} table exists\")\n",
-        "\n",
-        "            # Check if vector extension is installed\n",
-        "            cur.execute(\"SELECT * FROM pg_extension WHERE extname = 
'vector';\")\n",
-        "            if cur.fetchone():\n",
-        "                print(\"✓ pgvector extension is installed\")\n",
+        "        engine = get_alloydb_engine(instance_uri, user, password, 
database, **connect_kwargs)\n",
+        "\n",
+        "        with engine.connect() as connection:\n",
+        "            print(\"Testing connection...\")\n",
+        "            # Simple query to confirm connection\n",
+        "            connection.execute(text(\"SELECT 1\"))\n",
+        "            print(\"✓ Connection successful\")\n",
+        "\n",
+        "            # Check if table exists using information_schema\n",
+        "            # Use bind parameters (:tname) for safety, even though 
it's a table name here\n",
+        "            table_exists_query = text(\"\"\"\n",
+        "                SELECT EXISTS (\n",
+        "                    SELECT FROM information_schema.tables\n",
+        "                    WHERE table_schema = 'public' AND table_name = 
:tname\n",
+        "                );\n",
+        "            \"\"\")\n",
+        "            # .scalar() fetches the first column of the first row\n",
+        "            table_exists = connection.execute(table_exists_query, 
{\"tname\": table_name}).scalar()\n",
+        "\n",
+        "            if table_exists:\n",
+        "                print(f\"✓ '{table_name}' table exists\")\n",
+        "\n",
+        "                # Check if vector extension is installed\n",
+        "                ext_exists_query = text(\"\"\"\n",
+        "                    SELECT EXISTS (\n",
+        "                        SELECT FROM pg_extension WHERE extname = 
'vector'\n",
+        "                    );\n",
+        "                \"\"\")\n",
+        "                vector_installed = 
connection.execute(ext_exists_query).scalar()\n",
+        "\n",
+        "                if vector_installed:\n",
+        "                    print(\"✓ pgvector extension is installed\")\n",
+        "                else:\n",
+        "                    print(\"✗ pgvector extension is NOT 
installed\")\n",
         "            else:\n",
-        "                print(\"✗ pgvector extension is not installed\")\n",
-        "        else:\n",
-        "            print(f\"✗ {table_name} table does not exist\")\n",
+        "                print(f\"✗ '{table_name}' table does NOT exist\")\n",
         "\n",
+        "    except SQLAlchemyError as e:\n",
+        "        print(f\"Connection test failed (SQLAlchemy error): {e}\")\n",
         "    except Exception as e:\n",
-        "        print(f\"Connection test failed: {e}\")\n",
+        "        print(f\"Connection test failed (Unexpected error): {e}\")\n",
         "    finally:\n",
-        "        if 'cur' in locals():\n",
-        "            cur.close()\n",
-        "        if 'conn' in locals():\n",
-        "            conn.close()\n",
-        "\n",
-        "def verify_embeddings(host: str,\n",
-        "                     database: str,\n",
-        "                     table_name: str,\n",
-        "                     user: str,\n",
-        "                     password: str,\n",
-        "                     port: int = 5432):\n",
-        "    \"\"\"Connect to AlloyDB and print all written products.\"\"\"\n",
-        "    conn = psycopg2.connect(\n",
-        "        host=host,\n",
-        "        database=database,\n",
-        "        user=user,\n",
-        "        password=password,\n",
-        "        port=port\n",
-        "    )\n",
-        "\n",
+        "        if engine:\n",
+        "            engine.dispose()\n",
+        "\n",
+        "def verify_embeddings_sqlalchemy(instance_uri: str,\n",
+        "                                 database: str,\n",
+        "                                 table_name: str,\n",
+        "                                 user: str,\n",
+        "                                 password: str,\n",
+        "                                 **connect_kwargs):\n",
+        "    \"\"\"Connect to AlloyDB using SQLAlchemy and print all rows from 
the table.\"\"\"\n",
+        "    engine = None\n",
         "    try:\n",
-        "        with conn.cursor() as cur:\n",
-        "            # Simple SELECT * query\n",
-        "            cur.execute(f\"SELECT * FROM {table_name};\")\n",
-        "            rows = cur.fetchall()\n",
+        "        engine = get_alloydb_engine(instance_uri, user, password, 
database, **connect_kwargs)\n",
+        "\n",
+        "        with engine.connect() as connection:\n",
+        "            # Use f-string for table name in SELECT (ensure 
table_name is controlled)\n",
+        "            select_query = text(f\"SELECT * FROM {table_name};\")\n",
+        "            result = connection.execute(select_query)\n",
         "\n",
-        "            # Get column names from cursor description\n",
-        "            columns = [desc[0] for desc in cur.description]\n",
+        "            # Get column names from the result keys\n",
+        "            columns = result.keys()\n",
+        "            # Fetch all rows as mapping objects (dict-like)\n",
+        "            rows = result.mappings().all()\n",
         "\n",
-        "            print(f\"\\nFound {len(rows)} products:\")\n",
+        "            print(f\"\\nFound {len(rows)} products in 
'{table_name}':\")\n",
         "            print(\"-\" * 80)\n",
         "\n",
-        "            # Print each row with column names\n",
-        "            for row in rows:\n",
-        "                for col, val in zip(columns, row):\n",
-        "                    print(f\"{col}: {val}\")\n",
+        "            if not rows:\n",
+        "                print(\"Table is empty.\")\n",
         "                print(\"-\" * 80)\n",
-        "\n",
+        "            else:\n",
+        "                # Print each row\n",
+        "                for row in rows:\n",
+        "                    for col in columns:\n",
+        "                        print(f\"{col}: {row[col]}\")\n",
+        "                    print(\"-\" * 80)\n",
+        "\n",
+        "    except SQLAlchemyError as e:\n",
+        "        print(f\"Failed to verify embeddings (SQLAlchemy error): 
{e}\")\n",
+        "        # You might want to check specifically for ProgrammingError 
if the table doesn't exist\n",
+        "        # from sqlalchemy.exc import ProgrammingError\n",
+        "        # except ProgrammingError as pe:\n",
+        "        #    print(f\"Failed to query table '{table_name}'. Does it 
exist? Error: {pe}\")\n",
+        "    except Exception as e:\n",
+        "        print(f\"Failed to verify embeddings (Unexpected error): 
{e}\")\n",
         "    finally:\n",
-        "        conn.close()"
+        "        if engine:\n",
+        "            engine.dispose()"
       ]
     },
     {
@@ -489,7 +536,8 @@
         "We import the following for configuring our embedding ingestion 
pipeline:\n",
         "- `Chunk`, the structured input for generating and ingesting 
embeddings\n",
         "- `AlloyDBConnectionConfig` for configuring database connection 
information\n",
-        "- `AlloyDBVectorWriterConfig` for configuring write behavior like 
schema mapping and conflict resolution"
+        "- `AlloyDBVectorWriterConfig` for configuring write behavior like 
schema mapping and conflict resolution\n",
+        "- `AlloyDBLanguageConnectorConfig` to connect using the [AlloyDB 
language 
connector](https://cloud.google.com/alloydb/docs/connect-language-connectors)"
       ]
     },
     {
@@ -503,7 +551,8 @@
         "# Embedding-specific imports\n",
         "from apache_beam.ml.rag.ingestion.alloydb import (\n",
         "    AlloyDBVectorWriterConfig,\n",
-        "    AlloyDBConnectionConfig\n",
+        "    AlloyDBConnectionConfig,\n",
+        "    AlloyDBLanguageConnectorConfig\n",
         ")\n",
         "from apache_beam.ml.rag.ingestion.base import 
VectorDatabaseWriteTransform\n",
         "from apache_beam.ml.rag.types import Chunk, Content\n",
@@ -602,8 +651,8 @@
         "  content text,\n",
         "  metadata JSONB\n",
         "\"\"\"\n",
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -723,8 +772,18 @@
       },
       "outputs": [],
       "source": [
+        "# Configure the language connector so we can connect securly\n",
+        "language_connector_config = AlloyDBLanguageConnectorConfig(\n",
+        "    database_name=DB_NAME, instance_name=INSTANCE_URI, 
ip_type=\"PUBLIC\"\n",
+        ")\n",
+        "# Configure the AlloyDBConnectionConfig with language connector\n",
+        "connection_config = 
AlloyDBConnectionConfig.with_language_connector(\n",
+        "    connector_options=language_connector_config,\n",
+        "    username=DB_USER,\n",
+        "    password=DB_PASSWORD\n",
+        ")\n",
         "alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
-        "    connection_config = AlloyDBConnectionConfig(JDBC_URL, DB_USER, 
DB_PASSWORD),\n",
+        "    connection_config=connection_config,\n",
         "    table_name=table_name\n",
         ")"
       ]
@@ -786,7 +845,7 @@
       },
       "outputs": [],
       "source": [
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -848,8 +907,8 @@
         "  content text,\n",
         "  metadata JSONB\n",
         "\"\"\"\n",
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -882,7 +941,7 @@
         "from apache_beam.ml.transforms.base import MLTransform\n",
         "from apache_beam.ml.rag.types import Chunk, Content\n",
         "from apache_beam.ml.rag.ingestion.base import 
VectorDatabaseWriteTransform\n",
-        "from apache_beam.ml.rag.ingestion.alloydb import 
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig\n",
+        "from apache_beam.ml.rag.ingestion.alloydb import 
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig, 
AlloyDBLanguageConnectorConfig\n",
         "from apache_beam.ml.rag.embeddings.huggingface import 
HuggingfaceTextEmbeddings\n",
         "from apache_beam.options.pipeline_options import SetupOptions\n",
         "\n",
@@ -914,9 +973,9 @@
         "def run(argv=None):\n",
         "    parser = argparse.ArgumentParser()\n",
         "    parser.add_argument(\n",
-        "        '--alloydb_host',\n",
+        "        '--instance_uri',\n",
         "        required=True,\n",
-        "        help='AlloyDB host'\n",
+        "        help='AlloyDB instance uri'\n",
         "    )\n",
         "    parser.add_argument(\n",
         "        '--alloydb_database',\n",
@@ -961,8 +1020,10 @@
         "                )\n",
         "              | 'Write to AlloyDB' >> 
VectorDatabaseWriteTransform(\n",
         "                  AlloyDBVectorWriterConfig(\n",
-        "                      connection_config=AlloyDBConnectionConfig(\n",
-        "                        
jdbc_url=f'jdbc:postgresql://{known_args.alloydb_host}/{known_args.alloydb_database}',\n",
+        "                      
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "                        AlloyDBLanguageConnectorConfig(\n",
+        "                            
database_name=known_args.alloydb_database, 
instance_name=known_args.instance_uri\n",
+        "                        ),\n",
         "                        username=known_args.alloydb_username,\n",
         "                        password=known_args.alloydb_password\n",
         "                    ),\n",
@@ -1047,19 +1108,19 @@
       },
       "outputs": [],
       "source": [
+        "import os\n",
         "BUCKET_NAME = '' # @param {type:'string'}\n",
         "REGION = 'us-central1' # @param {type:'string'}\n",
         "os.environ['BUCKET_NAME'] = BUCKET_NAME\n",
         "os.environ['REGION'] = REGION\n",
         "\n",
         "# Save AlloyDB credentioals to environment variables\n",
-        "PRIVATE_ALLOYDB_HOST = '' # @param {type:'string'}\n",
-        "os.environ['PRIVATE_ALLOYDB_HOST'] = PRIVATE_ALLOYDB_HOST\n",
+        "os.environ['INSTANCE_URI'] = INSTANCE_URI\n",
         "os.environ['DATABASE_NAME'] = DB_NAME\n",
         "os.environ['ALLOYDB_USER'] = DB_USER\n",
         "os.environ['ALLOYDB_PASSWORD'] = DB_PASSWORD\n",
         "\n",
-        "NETWORK = '' # @param {type:'string'}\n",
+        "NETWORK = 'default' # @param {type:'string'}\n",
         "SUBNETWORK = '' # @param {type:'string'}\n",
         "os.environ['NETWORK'] = NETWORK\n",
         "os.environ['SUBNETWORK'] = SUBNETWORK"
@@ -1126,7 +1187,7 @@
         "!python ./basic_ingestion_pipeline.py \\\n",
         "  --project=$PROJECT_ID \\\n",
         "  --alloydb_username=$ALLOYDB_USER \\\n",
-        "  --alloydb_host=${PRIVATE_ALLOYDB_HOST}:5432 \\\n",
+        "  --instance_uri=$INSTANCE_URI \\\n",
         "  --alloydb_password=$ALLOYDB_PASSWORD \\\n",
         "  --alloydb_table=default_dataflow_product_embeddings \\\n",
         "  --alloydb_database=$DATABASE_NAME \\\n",
@@ -1159,7 +1220,7 @@
       },
       "outputs": [],
       "source": [
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name='default_dataflow_product_embeddings', 
user=DB_USER, password=DB_PASSWORD)\n"
       ]
     },
     {
@@ -1286,8 +1347,8 @@
         "    model_name VARCHAR,\n",
         "    created_at TIMESTAMP\n",
         "\"\"\"\n",
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -1432,7 +1493,15 @@
         "              
.with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n",
         "            | 'Write to AlloyDB' >> VectorDatabaseWriteTransform(\n",
         "                AlloyDBVectorWriterConfig(\n",
-        "                    connection_config = 
AlloyDBConnectionConfig(JDBC_URL, DB_USER, DB_PASSWORD),\n",
+        "                    
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "                        
connector_options=AlloyDBLanguageConnectorConfig(\n",
+        "                            database_name=DB_NAME,\n",
+        "                            instance_name=INSTANCE_URI,\n",
+        "                            ip_type=\"PUBLIC\"\n",
+        "                        ),\n",
+        "                        username=DB_USER,\n",
+        "                        password=DB_PASSWORD\n",
+        "                    ),\n",
         "                    table_name=table_name,\n",
         "                    column_specs=column_specs\n",
         "                )\n",
@@ -1459,7 +1528,7 @@
       },
       "outputs": [],
       "source": [
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -1500,8 +1569,8 @@
         "  metadata JSONB,\n",
         "  created_at TIMESTAMP NOT NULL DEFAULT NOW()\n",
         "\"\"\"\n",
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -1588,7 +1657,15 @@
         "\n",
         "# Create writer config with conflict resolution\n",
         "alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
-        "    connection_config=AlloyDBConnectionConfig(JDBC_URL, DB_USER, 
DB_PASSWORD),\n",
+        "    
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "        connector_options=AlloyDBLanguageConnectorConfig(\n",
+        "            database_name=DB_NAME,\n",
+        "            instance_name=INSTANCE_URI,\n",
+        "            ip_type=\"PUBLIC\"\n",
+        "        ),\n",
+        "        username=DB_USER,\n",
+        "        password=DB_PASSWORD\n",
+        "    ),\n",
         "    table_name=table_name,\n",
         "    conflict_resolution=conflict_resolution,\n",
         ")"
@@ -1665,7 +1742,7 @@
       "outputs": [],
       "source": [
         "print(\"\\nAfter Day 1 ingestion:\")\n",
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -1726,7 +1803,7 @@
       "outputs": [],
       "source": [
         "print(\"\\nAfter Day 2 ingestion:\")\n",
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -1793,67 +1870,98 @@
       "outputs": [],
       "source": [
         "#@title Postgres helpers for inserting initial records\n",
-        "def setup_initial_data(host: str,\n",
-        "                      database: str,\n",
-        "                      table_name: str,\n",
-        "                      user: str,\n",
-        "                      password: str,\n",
-        "                      port: int = 5432):\n",
-        "    \"\"\"Set up table and insert sample product data.\"\"\"\n",
-        "    conn_string = f\"host={host} dbname={database} user={user} 
password={password} port={port}\"\n",
+        "import sqlalchemy\n",
+        "from sqlalchemy import text\n",
+        "from sqlalchemy.exc import SQLAlchemyError\n",
+        "from google.cloud.alloydb.connector import Connector\n",
+        "\n",
+        "def setup_initial_data_sqlalchemy(instance_uri: str,\n",
+        "                                  database: str,\n",
+        "                                  table_name: str,\n",
+        "                                  table_schema: str,\n",
+        "                                  user: str,\n",
+        "                                  password: str,\n",
+        "                                  **connect_kwargs):\n",
+        "    \"\"\"Set up table and insert sample product data using 
SQLAlchemy.\n",
+        "\n",
+        "    (Revised to handle potential connection closing issue after 
DDL)\n",
         "\n",
+        "    Args:\n",
+        "        instance_uri: AlloyDB instance URI\n",
+        "        database: Database name\n",
+        "        table_name: Name of the table to create and populate.\n",
+        "        table_schema: SQL string defining the table columns.\n",
+        "        user: Database user\n",
+        "        password: Database password\n",
+        "        connect_kwargs: Additional keyword arguments for 
connector.connect().\n",
+        "    \"\"\"\n",
+        "    engine = None\n",
         "    try:\n",
-        "        conn = psycopg2.connect(conn_string)\n",
-        "        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
-        "        cur = conn.cursor()\n",
-        "\n",
-        "        # Create pgvector extension\n",
-        "        cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
-        "\n",
-        "        # Create products table\n",
-        "        cur.execute(f\"DROP TABLE IF EXISTS {table_name};\")\n",
-        "        cur.execute(f\"CREATE TABLE {table_name} 
({table_schema});\")\n",
-        "\n",
-        "        # Insert sample data\n",
-        "        sample_products = [\n",
-        "            (\n",
-        "                \"lamp-001\",\n",
-        "                \"Artisan Table Lamp\",\n",
-        "                \"Hand-crafted ceramic table lamp with linen shade. 
\"\n",
-        "                \"Features dimmable LED bulb and touch-sensitive 
base.\",\n",
-        "                129.99\n",
-        "            ),\n",
-        "            (\n",
-        "                \"mirror-001\",\n",
-        "                \"Floating Wall Mirror\",\n",
-        "                \"Modern circular mirror with hidden mounting system. 
\"\n",
-        "                \"Perfect for entryways and contemporary 
spaces.\",\n",
-        "                199.99\n",
-        "            ),\n",
-        "            (\n",
-        "                \"vase-001\",\n",
-        "                \"Contemporary Ceramic Vase\",\n",
-        "                \"Minimalist vase with matte finish. \"\n",
-        "                \"Ideal for dried or fresh flower arrangements.\",\n",
-        "                79.99\n",
-        "            )\n",
-        "        ]\n",
-        "\n",
-        "        cur.executemany(\n",
-        "            f\"\"\"\n",
-        "            INSERT INTO {table_name} (id, title, description, 
price)\n",
-        "            VALUES (%s, %s, %s, %s)\n",
-        "            \"\"\",\n",
-        "            sample_products\n",
-        "        )\n",
-        "\n",
-        "        print(\"✓ Sample products inserted successfully\")\n",
-        "\n",
+        "        engine = get_alloydb_engine(instance_uri, user, password, 
database, **connect_kwargs)\n",
+        "\n",
+        "        # Use a single connection for both DDL and DML\n",
+        "        with engine.connect() as connection:\n",
+        "            print(\"Connected to AlloyDB successfully via 
SQLAlchemy!\")\n",
+        "\n",
+        "            # === DDL Operations (Relying on implicit autocommit for 
DDL) ===\n",
+        "            # Execute DDL directly on the connection outside an 
explicit transaction.\n",
+        "            # SQLAlchemy + Postgres drivers usually handle this 
correctly.\n",
+        "\n",
+        "            print(\"Ensuring pgvector extension exists...\")\n",
+        "            connection.execute(text(\"CREATE EXTENSION IF NOT EXISTS 
vector;\"))\n",
+        "\n",
+        "            print(f\"Dropping table {table_name} if exists...\")\n",
+        "            connection.execute(text(f\"DROP TABLE IF EXISTS 
{table_name};\"))\n",
+        "\n",
+        "            print(f\"Creating table {table_name}...\")\n",
+        "            create_sql = f\"CREATE TABLE {table_name} 
({table_schema});\"\n",
+        "            connection.execute(text(create_sql))\n",
+        "            print(f\"Table {table_name} created.\")\n",
+        "\n",
+        "            # === DML Operations (Runs in default transaction started 
by connect()) ===\n",
+        "            sample_products_dicts = [\n",
+        "                # (Sample data dictionaries as defined in the 
previous version)\n",
+        "                 {\n",
+        "                    \"id\": \"lamp-001\", \"title\": \"Artisan Table 
Lamp\",\n",
+        "                    \"description\": \"Hand-crafted ceramic...\", 
\"price\": 129.99\n",
+        "                 },\n",
+        "                 {\n",
+        "                    \"id\": \"mirror-001\", \"title\": \"Floating 
Wall Mirror\",\n",
+        "                    \"description\": \"Modern circular mirror...\", 
\"price\": 199.99\n",
+        "                 },\n",
+        "                 {\n",
+        "                     \"id\": \"vase-001\", \"title\": \"Contemporary 
Ceramic Vase\",\n",
+        "                     \"description\": \"Minimalist vase...\", 
\"price\": 79.99\n",
+        "                 }\n",
+        "                 # Add embedding data if needed\n",
+        "            ]\n",
+        "\n",
+        "            insert_sql = text(f\"\"\"\n",
+        "                INSERT INTO {table_name} (id, title, description, 
price)\n",
+        "                VALUES (:id, :title, :description, :price)\n",
+        "            \"\"\") # Add other columns if needed\n",
+        "\n",
+        "            print(f\"Inserting sample data into {table_name}...\")\n",
+        "            # Execute DML within the connection's transaction\n",
+        "            connection.execute(insert_sql, sample_products_dicts)\n",
+        "\n",
+        "            # Commit the transaction containing the INSERTs\n",
+        "            print(\"Committing transaction...\")\n",
+        "            connection.commit()\n",
+        "            print(\"✓ Sample products inserted successfully\")\n",
+        "\n",
+        "        print(\"Initial data setup completed successfully using 
SQLAlchemy!\")\n",
+        "\n",
+        "    except SQLAlchemyError as e:\n",
+        "        print(f\"An SQLAlchemy error occurred during initial data 
setup: {e}\")\n",
+        "        # Note: If an error occurs *before* commit, the transaction 
is usually\n",
+        "        # rolled back automatically when the 'with engine.connect()' 
block exits.\n",
+        "    except Exception as e:\n",
+        "        print(f\"An unexpected error occurred during initial data 
setup: {e}\")\n",
         "    finally:\n",
-        "        if 'cur' in locals():\n",
-        "            cur.close()\n",
-        "        if 'conn' in locals():\n",
-        "            conn.close()"
+        "        if engine:\n",
+        "            print(\"Disposing engine pool...\")\n",
+        "            engine.dispose()"
       ]
     },
     {
@@ -1864,7 +1972,7 @@
       },
       "outputs": [],
       "source": [
-        "setup_initial_data(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_initial_data_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
table_schema, DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -1892,7 +2000,15 @@
         "\n",
         "# Configure database writer\n",
         "alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
-        "    connection_config=AlloyDBConnectionConfig(JDBC_URL, DB_USER, 
DB_PASSWORD),\n",
+        "    
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "        connector_options=AlloyDBLanguageConnectorConfig(\n",
+        "            database_name=DB_NAME,\n",
+        "            instance_name=INSTANCE_URI,\n",
+        "            ip_type=\"PUBLIC\"\n",
+        "        ),\n",
+        "        username=DB_USER,\n",
+        "        password=DB_PASSWORD\n",
+        "    ),\n",
         "    table_name=table_name,\n",
         "    column_specs=(\n",
         "        ColumnSpecsBuilder()\n",
@@ -1922,10 +2038,18 @@
         "        | \"Read Products\" >> ReadFromJdbc(\n",
         "            table_name=table_name,\n",
         "            driver_class_name=\"org.postgresql.Driver\",\n",
-        "            jdbc_url=JDBC_URL,\n",
+        "            jdbc_url=AlloyDBLanguageConnectorConfig(\n",
+        "                database_name=DB_NAME,\n",
+        "                instance_name=INSTANCE_URI,\n",
+        "                ip_type=\"PUBLIC\"\n",
+        "            ).to_jdbc_url(),\n",
         "            username=DB_USER,\n",
         "            password=DB_PASSWORD,\n",
-        "            query=f\"SELECT id, title, description FROM 
{table_name}\"\n",
+        "            query=f\"SELECT id, title, description FROM 
{table_name}\",\n",
+        "            classpath=[\n",
+        "                \"org.postgresql:postgresql:42.2.16\",\n",
+        "                \"com.google.cloud:alloydb-jdbc-connector:1.2.0\"\n",
+        "            ]\n",
         "        )\n",
         "    )\n",
         "\n",
@@ -1965,7 +2089,7 @@
       "outputs": [],
       "source": [
         "print(\"\\nAfter embedding generation:\")\n",
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -2063,8 +2187,8 @@
         "  content text,\n",
         "  metadata JSONB\n",
         "\"\"\"\n",
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ]
     },
     {
@@ -2088,7 +2212,7 @@
       "source": [
         "from apache_beam.ml.rag.embeddings.vertex_ai import 
VertexAITextEmbeddings\n",
         "\n",
-        "vertexai_embedder = 
VertexAITextEmbeddings(model_name=\"textembedding-gecko@latest\")"
+        "vertexai_embedder = 
VertexAITextEmbeddings(model_name=\"text-embedding-005\")"
       ]
     },
     {
@@ -2129,8 +2253,12 @@
         "                )\n",
         "              | 'Write to AlloyDB' >> 
VectorDatabaseWriteTransform(\n",
         "                  AlloyDBVectorWriterConfig(\n",
-        "                      connection_config=AlloyDBConnectionConfig(\n",
-        "                        jdbc_url=JDBC_URL,\n",
+        "                    
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "                        
connector_options=AlloyDBLanguageConnectorConfig(\n",
+        "                            database_name=DB_NAME,\n",
+        "                            instance_name=INSTANCE_URI,\n",
+        "                            ip_type=\"PUBLIC\"\n",
+        "                        ),\n",
         "                        username=DB_USER,\n",
         "                        password=DB_PASSWORD\n",
         "                    ),\n",
@@ -2158,7 +2286,7 @@
       "outputs": [],
       "source": [
         "print(\"\\nAfter embedding generation:\")\n",
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ]
     },
     {
@@ -2210,9 +2338,8 @@
         "from google.api_core.exceptions import AlreadyExists\n",
         "import json\n",
         "\n",
-        "# Define topic and subscription names\n",
+        "# Define pubsub topic\n",
         "TOPIC = \"product-updates\" # @param {type:'string'}\n",
-        "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
         "\n",
         "# Create publisher client and topic\n",
         "publisher = pubsub_v1.PublisherClient()\n",
@@ -2221,18 +2348,7 @@
         "    topic = publisher.create_topic(request={\"name\": topic_path})\n",
         "    print(f\"Created topic: {topic.name}\")\n",
         "except AlreadyExists:\n",
-        "    print(f\"Topic {topic_path} already exists.\")\n",
-        "\n",
-        "# Create subscriber client and subscription\n",
-        "subscriber = pubsub_v1.SubscriberClient()\n",
-        "subscription_path = subscriber.subscription_path(PROJECT_ID, 
SUBSCRIPTION)\n",
-        "try:\n",
-        "    subscription = subscriber.create_subscription(\n",
-        "        request={\"name\": subscription_path, \"topic\": 
topic_path}\n",
-        "    )\n",
-        "    print(f\"Created subscription: {subscription.name}\")\n",
-        "except AlreadyExists:\n",
-        "    print(f\"Subscription {subscription_path} already exists.\")"
+        "    print(f\"Topic {topic_path} already exists.\")"
       ],
       "metadata": {
         "id": "nqMe0Brlt7Bk"
@@ -2260,7 +2376,7 @@
         "  embedding VECTOR(384) NOT NULL,\n",
         "  content text,\n",
         "  metadata JSONB,\n",
-        "  last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+        "  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
         "\"\"\""
       ],
       "metadata": {
@@ -2272,8 +2388,8 @@
     {
       "cell_type": "code",
       "source": [
-        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
-        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, 
table_name,table_schema, DB_USER, DB_PASSWORD)\n",
+        "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, 
DB_USER, DB_PASSWORD)"
       ],
       "metadata": {
         "id": "8HPhUfAuorBP"
@@ -2301,8 +2417,6 @@
       "source": [
         "from apache_beam.options.pipeline_options import PipelineOptions, 
StandardOptions, SetupOptions, GoogleCloudOptions, WorkerOptions\n",
         "\n",
-        "PRIVATE_ALLOYDB_HOST = '' # @param {type:'string'}\n",
-        "\n",
         "options = PipelineOptions()\n",
         "options.view_as(StandardOptions).streaming = True\n",
         "\n",
@@ -2315,7 +2429,7 @@
         "\n",
         "# The VPC network to run your Dataflow job in.\n",
         "# Should be the same as the AlloyDB network if using Private services 
access.\n",
-        "NETWORK = '' # @param {type:'string'}\n",
+        "NETWORK = 'default' # @param {type:'string'}\n",
         "options.view_as(WorkerOptions).network = NETWORK\n",
         "\n",
         "# The VPC subnetwork to run your Dataflow job in.\n",
@@ -2423,7 +2537,7 @@
         "_ = (\n",
         "    pipeline\n",
         "    | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n",
-        "        
subscription=f\"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION}\"\n",
+        "        topic=f\"projects/{PROJECT_ID}/topics/{TOPIC}\"\n",
         "    )\n",
         "    | \"Window\" >> beam.WindowInto(FixedWindows(10))\n",
         "    | \"Parse Messages\" >> beam.Map(parse_message)\n",
@@ -2431,8 +2545,11 @@
         "        
.with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n",
         "    | \"Write to AlloyDB\" >> VectorDatabaseWriteTransform(\n",
         "        AlloyDBVectorWriterConfig(\n",
-        "            connection_config=AlloyDBConnectionConfig(\n",
-        "                
jdbc_url=f'jdbc:postgresql://{PRIVATE_ALLOYDB_HOST}/{DB_NAME}',\n",
+        "            
connection_config=AlloyDBConnectionConfig.with_language_connector(\n",
+        "                connector_options=AlloyDBLanguageConnectorConfig(\n",
+        "                    database_name=DB_NAME,\n",
+        "                    instance_name=INSTANCE_URI\n",
+        "                ),\n",
         "                username=DB_USER,\n",
         "                password=DB_PASSWORD\n",
         "            ),\n",
@@ -2624,7 +2741,7 @@
       "source": [
         "# Verify the results\n",
         "print(\"\\nAfter embedding generation:\")\n",
-        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+        "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, 
database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)"
       ],
       "metadata": {
         "id": "zSb1UoCSznkW"

Reply via email to