This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2b57eddc4 Adding GCP Spanner Change Stream support for Python 
(#35453)
7c2b57eddc4 is described below

commit 7c2b57eddc4ebd61fbd7ae66f85e8def0f576176
Author: flpablo <[email protected]>
AuthorDate: Thu Aug 7 07:33:59 2025 -0600

    Adding GCP Spanner Change Stream support for Python (#35453)
    
    * Adding Spanner Change Stream support to python.
    
    * Updating CHANGES.md
    
    * Fixing python format
    
    * Modify CHANGES.md
    
    * Restarting Tests
    
    * Retesting
---
 CHANGES.md                                         |   1 +
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |  28 ++++
 .../io/gcp/spanner/SpannerTransformRegistrar.java  | 117 ++++++++++++++++-
 .../gcp/spanner/SpannerTransformRegistrarTest.java | 145 ++++++++++++++++++++-
 sdks/python/apache_beam/io/gcp/spanner.py          | 103 +++++++++++++++
 5 files changed, 391 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index bffa326c9ff..45f49db7a34 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -180,6 +180,7 @@
 * [Python] Prism runner now auto-enabled for some Python pipelines using the 
direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
 * [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
 * Python: Added JupyterLab 4.x extension compatibility for enhanced notebook 
integration ([#34495](https://github.com/apache/beam/pull/34495)).
+* [Python] Adding GCP Spanner Change Stream support for Python 
(apache_beam.io.gcp.spanner). ([#24103] 
https://github.com/apache/beam/issues/24103).
 
 ## Breaking Changes
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index d3b2632bad0..163b6d67bb2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -52,6 +52,8 @@ import com.google.cloud.spanner.SpannerOptions;
 import com.google.cloud.spanner.Statement;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -1015,6 +1017,32 @@ public class SpannerIO {
     }
   }
 
+  static class ChangeStreamRead extends PTransform<PBegin, 
PCollection<String>> {
+
+    ReadChangeStream readChangeStream;
+
+    public ChangeStreamRead(ReadChangeStream readChangeStream) {
+      this.readChangeStream = readChangeStream;
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input
+          .apply(readChangeStream)
+          .apply("DataChangeRecordToStringJSON", ParDo.of(new 
DataChangeRecordToJsonFn()));
+    }
+  }
+
+  private static class DataChangeRecordToJsonFn extends DoFn<DataChangeRecord, 
String> {
+    private static Gson gson = new 
GsonBuilder().disableHtmlEscaping().create();
+
+    @ProcessElement
+    public void process(@Element DataChangeRecord input, 
OutputReceiver<String> receiver) {
+      String modJsonString = gson.toJson(input, DataChangeRecord.class);
+      receiver.output(modJsonString);
+    }
+  }
+
   /**
    * A {@link PTransform} that create a transaction. If applied to a {@link 
PCollection}, it will
    * create a transaction after the {@link PCollection} is closed.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 72b51beadb5..e7c2339f798 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -23,6 +23,7 @@ import static 
com.google.cloud.spanner.TimestampBound.Mode.READ_TIMESTAMP;
 import com.google.auto.service.AutoService;
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options.RpcPriority;
 import com.google.cloud.spanner.TimestampBound;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -43,8 +44,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 
 /**
- * Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an 
external transform for
- * cross-language usage.
+ * Exposes {@link SpannerIO.WriteRows}, {@link SpannerIO.ReadRows} and {@link
+ * SpannerIO.ChangeStreamRead} as an external transform for cross-language 
usage.
  */
 @AutoService(ExternalTransformRegistrar.class)
 public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
@@ -55,6 +56,8 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
       "beam:transform:org.apache.beam:spanner_insert_or_update:v1";
   public static final String DELETE_URN = 
"beam:transform:org.apache.beam:spanner_delete:v1";
   public static final String READ_URN = 
"beam:transform:org.apache.beam:spanner_read:v1";
+  public static final String READ_CHANGE_STREAM_URN =
+      "beam:transform:org.apache.beam:spanner_change_stream_reader:v1";
 
   @Override
   @NonNull
@@ -66,6 +69,7 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
         .put(INSERT_OR_UPDATE_URN, new InsertOrUpdateBuilder())
         .put(DELETE_URN, new DeleteBuilder())
         .put(READ_URN, new ReadBuilder())
+        .put(READ_CHANGE_STREAM_URN, new ChangeStreamReaderBuilder())
         .build();
   }
 
@@ -382,4 +386,113 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
       return SpannerIO.WriteRows.of(writeTransform, operation, 
configuration.table);
     }
   }
+
+  public static class ChangeStreamReaderBuilder
+      implements ExternalTransformBuilder<
+          ChangeStreamReaderBuilder.Configuration, PBegin, 
PCollection<String>> {
+
+    public static class Configuration extends CrossLanguageConfiguration {
+      private String changeStreamName = "";
+      private String metadataDatabase = "";
+      private String metadataInstance = "";
+      private @Nullable Timestamp inclusiveStartAt;
+      private @Nullable Timestamp inclusiveEndAt;
+      private @Nullable String metadataTable;
+      private @Nullable RpcPriority rpcPriority;
+      private @Nullable Duration watermarkRefreshRate;
+
+      public void setChangeStreamName(String changeStreamName) {
+        this.changeStreamName = changeStreamName;
+      }
+
+      public void setInclusiveStartAt(@Nullable String inclusiveStartAtString) 
{
+        if (inclusiveStartAtString != null) {
+          this.inclusiveStartAt = 
Timestamp.parseTimestamp(inclusiveStartAtString);
+        }
+      }
+
+      public void setInclusiveEndAt(@Nullable String inclusiveEndAtString) {
+        if (inclusiveEndAtString != null) {
+          this.inclusiveEndAt = Timestamp.parseTimestamp(inclusiveEndAtString);
+        }
+      }
+
+      public void setMetadataDatabase(String metadataDatabase) {
+        this.metadataDatabase = metadataDatabase;
+      }
+
+      public void setMetadataInstance(String metadataInstance) {
+        this.metadataInstance = metadataInstance;
+      }
+
+      public void setMetadataTable(@Nullable String metadataTable) {
+        this.metadataTable = metadataTable;
+      }
+
+      public void setRpcPriority(@Nullable String rpcPriorityString) {
+        if (rpcPriorityString != null) {
+          this.rpcPriority = RpcPriority.valueOf(rpcPriorityString);
+        }
+      }
+
+      public void setWatermarkRefreshRate(@Nullable String 
watermarkRefreshRateString) {
+        if (watermarkRefreshRateString != null) {
+          this.watermarkRefreshRate = 
Duration.parse(watermarkRefreshRateString);
+        }
+      }
+    }
+
+    @Override
+    @NonNull
+    public PTransform<PBegin, PCollection<String>> buildExternal(
+        ChangeStreamReaderBuilder.Configuration configuration) {
+
+      configuration.checkMandatoryFields();
+
+      if (configuration.changeStreamName.isEmpty()) {
+        throw new IllegalArgumentException("ChangeStreamName can't be empty");
+      }
+
+      if (configuration.metadataInstance.isEmpty()) {
+        throw new IllegalArgumentException("MetadataInstance can't be empty");
+      }
+
+      if (configuration.metadataDatabase.isEmpty()) {
+        throw new IllegalArgumentException("MetadataDatabase can't be empty");
+      }
+
+      SpannerIO.ReadChangeStream readChangeStream =
+          SpannerIO.readChangeStream()
+              .withProjectId(configuration.projectId)
+              .withInstanceId(configuration.instanceId)
+              .withDatabaseId(configuration.databaseId)
+              .withChangeStreamName(configuration.changeStreamName)
+              .withMetadataInstance(configuration.metadataInstance)
+              .withMetadataDatabase(configuration.metadataDatabase);
+
+      if (configuration.inclusiveStartAt != null) {
+        readChangeStream = 
readChangeStream.withInclusiveStartAt(configuration.inclusiveStartAt);
+      }
+
+      if (configuration.inclusiveEndAt != null) {
+        readChangeStream = 
readChangeStream.withInclusiveEndAt(configuration.inclusiveEndAt);
+      }
+
+      if (configuration.metadataTable != null) {
+        readChangeStream = 
readChangeStream.withMetadataTable(configuration.metadataTable);
+      }
+
+      if (configuration.rpcPriority != null) {
+
+        readChangeStream = 
readChangeStream.withRpcPriority(configuration.rpcPriority);
+      }
+
+      if (configuration.watermarkRefreshRate != null) {
+        readChangeStream =
+            
readChangeStream.withWatermarkRefreshRate(configuration.watermarkRefreshRate);
+      }
+
+      return new SpannerIO.ChangeStreamRead(readChangeStream);
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
index 3b38e7e528a..666cda91f73 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
@@ -26,6 +26,7 @@ import com.google.cloud.spanner.TimestampBound;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
+import 
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ChangeStreamReaderBuilder;
 import 
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.InsertBuilder;
 import 
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ReadBuilder;
 import org.apache.beam.sdk.schemas.Schema;
@@ -48,22 +49,29 @@ public class SpannerTransformRegistrarTest {
   public static final String SPANNER_PROJECT = "spanner-project";
   public static final String SPANNER_TABLE = "spanner-table";
   public static final String SPANNER_SQL_QUERY = "SELECT * from 
spanner_table;";
+  public static final String SPANNER_CHANGE_STREAM_NAME = 
"spanner-change-stream-name";
+  public static final String SPANNER_CHANGE_STREAM_METADATA_INSTANCE =
+      "spanner-change-stream-instance";
+  public static final String SPANNER_CHANGE_STREAM_METADATA_DATABASE =
+      "spanner-change-stream-database";
   private SpannerTransformRegistrar spannerTransformRegistrar;
   private ReadBuilder readBuilder;
   private InsertBuilder writeBuilder;
+  private ChangeStreamReaderBuilder changeStreamReaderBuilder;
 
   @Before
   public void setup() {
     spannerTransformRegistrar = new SpannerTransformRegistrar();
     readBuilder = new ReadBuilder();
     writeBuilder = new InsertBuilder();
+    changeStreamReaderBuilder = new ChangeStreamReaderBuilder();
   }
 
   @Test
   public void testKnownBuilderInstances() {
     Map<String, ExternalTransformBuilder<?, ?, ?>> builderInstancesMap =
         spannerTransformRegistrar.knownBuilderInstances();
-    assertEquals(6, builderInstancesMap.size());
+    assertEquals(7, builderInstancesMap.size());
     assertThat(builderInstancesMap, 
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_URN));
     assertThat(builderInstancesMap, 
IsMapContaining.hasKey(SpannerTransformRegistrar.UPDATE_URN));
     assertThat(builderInstancesMap, 
IsMapContaining.hasKey(SpannerTransformRegistrar.REPLACE_URN));
@@ -72,6 +80,9 @@ public class SpannerTransformRegistrarTest {
         
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_OR_UPDATE_URN));
     assertThat(builderInstancesMap, 
IsMapContaining.hasKey(SpannerTransformRegistrar.DELETE_URN));
     assertThat(builderInstancesMap, 
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_URN));
+    assertThat(
+        builderInstancesMap,
+        
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_CHANGE_STREAM_URN));
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -207,4 +218,136 @@ public class SpannerTransformRegistrarTest {
     configuration.setMaxCumulativeBackoff(100L);
     return configuration;
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingMandatoryFields() {
+    changeStreamReaderBuilder.buildExternal(new 
ChangeStreamReaderBuilder.Configuration());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingDatabaseId() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    changeStreamReaderBuilder.buildExternal(configuration);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingInstanceId() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    changeStreamReaderBuilder.buildExternal(configuration);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingChangeStreamName() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    changeStreamReaderBuilder.buildExternal(configuration);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingMetadataInstance() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    changeStreamReaderBuilder.buildExternal(configuration);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void 
testChangeStreamReaderBuilderBuildExternalWithMissingMetadataDatabase() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    changeStreamReaderBuilder.buildExternal(configuration);
+  }
+
+  @Test
+  public void testChangeStreamReaderBuilderBuildExternalWithRequiredFields() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+
+    PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+        changeStreamReaderBuilder.buildExternal(configuration);
+    assertNotNull(changeStreamReaderTransform);
+  }
+
+  @Test
+  public void testChangeStreamReaderBuilderBuildExternalWithAllFields() {
+    String startAt = "2023-01-01T00:00:00Z";
+    String endAt = "2023-01-02T00:00:00Z";
+    String metadataTable = "meta-table";
+    String rpcPriority = "HIGH";
+    String refreshRate = "PT30S";
+
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    configuration.setInclusiveStartAt(startAt);
+    configuration.setInclusiveEndAt(endAt);
+    configuration.setMetadataTable(metadataTable);
+    configuration.setRpcPriority(rpcPriority);
+    configuration.setWatermarkRefreshRate(refreshRate);
+
+    PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+        changeStreamReaderBuilder.buildExternal(configuration);
+    assertNotNull(changeStreamReaderTransform);
+  }
+
+  @Test
+  public void 
testChangeStreamReaderBuilderBuildExternalWithNullOptionalValues() {
+    ChangeStreamReaderBuilder.Configuration configuration =
+        new ChangeStreamReaderBuilder.Configuration();
+
+    configuration.setProjectId(SPANNER_PROJECT);
+    configuration.setDatabaseId(SPANNER_DATABASE);
+    configuration.setInstanceId(SPANNER_INSTANCE);
+    configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+    configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+    configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+    configuration.setInclusiveStartAt(null);
+    configuration.setInclusiveEndAt(null);
+    configuration.setMetadataTable(null);
+    configuration.setRpcPriority(null);
+    configuration.setWatermarkRefreshRate(null);
+
+    PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+        changeStreamReaderBuilder.buildExternal(configuration);
+    assertNotNull(changeStreamReaderTransform);
+  }
 }
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py 
b/sdks/python/apache_beam/io/gcp/spanner.py
index 9089d746fe1..03ad91069b9 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -145,6 +145,20 @@ class ReadFromSpannerSchema(NamedTuple):
   time_unit: Optional[str]
 
 
+class ReadChangeStreamFromSpannerSchema(NamedTuple):
+  instance_id: str
+  database_id: str
+  project_id: str
+  changeStreamName: str
+  inclusiveStartAt: str
+  inclusiveEndAt: Optional[str]
+  metadataDatabase: str
+  metadataInstance: str
+  metadataTable: Optional[str]
+  rpcPriority: Optional[str]
+  watermarkRefreshRate: Optional[str]
+
+
 class ReadFromSpanner(ExternalTransform):
   """
   A PTransform which reads from the specified Spanner instance's database.
@@ -659,5 +673,94 @@ class SpannerUpdate(ExternalTransform):
     )
 
 
+class ReadChangeStreamFromSpanner(ExternalTransform):
+  """
+  A PTransform to read Change Streams from Google Cloud Spanner.
+
+  The output of this transform is a PCollection of JSON strings,
+  where each string represents a com.google.cloud.spanner.DataChangeRecord.
+
+  Example:
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    p | 
+    "ReadFromSpannerChangeStream" >> beam_spanner.ReadChangeStreamFromSpanner( 
+        project_id="spanner-project-id", 
+        instance_id="spanner-instance-id", 
+        database_id="spanner-database-id", 
+        changeStreamName="spanner-change-stream", 
+        inclusiveStartAt="2025-05-20T10:00:00Z",
+        metadataDatabase="spanner-metadata-database",
+        metadataInstance="spanner-metadata-instance")
+
+  Experimental; no backwards compatibility guarantees.
+  """
+
+  URN = 'beam:transform:org.apache.beam:spanner_change_stream_reader:v1'
+
+  def __init__(
+      self,
+      project_id,
+      instance_id,
+      database_id,
+      changeStreamName,
+      metadataDatabase,
+      metadataInstance,
+      inclusiveStartAt,
+      inclusiveEndAt=None,
+      metadataTable=None,
+      rpcPriority=None,
+      watermarkRefreshRate=None,
+      expansion_service=None,
+  ):
+    """
+        Reads Change Streams from Google Cloud Spanner.
+        
+        :param project_id: (Required) Specifies the Cloud Spanner project.
+        :param instance_id: (Required) Specifies the Cloud Spanner 
+          instance.
+        :param database_id: (Required) Specifies the Cloud Spanner 
+          database.
+        :param changeStreamName: (Required) The name of the Spanner 
+          change stream to read.
+        :param metadataDatabase: (Required) The database where the 
+          change stream metadata is stored.
+        :param metadataInstance: (Required) The instance where the 
+          change stream metadata database resides.
+        :param inclusiveStartAt: (Required) An inclusive start timestamp 
+          for reading the change stream.
+        :param inclusiveEndAt: (Optional) An inclusive end timestamp for 
+          reading the change stream. If not specified, the stream will be 
+          read indefinitely.
+        :param metadataTable: (Optional) The name of the metadata table used 
+          by the change stream connector. If not specified, a default table 
+          name will be used.
+        :param rpcPriority: (Optional) The RPC priority for Spanner 
operations. 
+          Can be 'HIGH', 'MEDIUM', or 'LOW'.
+        :param watermarkRefreshRate: (Optional) The duration at which the 
+          watermark is refreshed.
+        """
+
+    super().__init__(
+        self.URN,
+        NamedTupleBasedPayloadBuilder(
+            ReadChangeStreamFromSpannerSchema(
+                instance_id=instance_id,
+                database_id=database_id,
+                project_id=project_id,
+                changeStreamName=changeStreamName,
+                inclusiveStartAt=inclusiveStartAt,
+                inclusiveEndAt=inclusiveEndAt,
+                metadataDatabase=metadataDatabase,
+                metadataInstance=metadataInstance,
+                metadataTable=metadataTable,
+                rpcPriority=rpcPriority,
+                watermarkRefreshRate=watermarkRefreshRate,
+            ),
+        ),
+        expansion_service=expansion_service or default_io_expansion_service(),
+    )
+
+
 def _get_enum_name(enum):
   return None if enum is None else enum.name

Reply via email to