This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7c2b57eddc4 Adding GCP Spanner Change Stream support for Python
(#35453)
7c2b57eddc4 is described below
commit 7c2b57eddc4ebd61fbd7ae66f85e8def0f576176
Author: flpablo <[email protected]>
AuthorDate: Thu Aug 7 07:33:59 2025 -0600
Adding GCP Spanner Change Stream support for Python (#35453)
* Adding Spanner Change Stream support to python.
* Updating CHANGES.md
* Fixing python format
* Modify CHANGES.md
* Restarting Tests
* Retesting
---
CHANGES.md | 1 +
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 28 ++++
.../io/gcp/spanner/SpannerTransformRegistrar.java | 117 ++++++++++++++++-
.../gcp/spanner/SpannerTransformRegistrarTest.java | 145 ++++++++++++++++++++-
sdks/python/apache_beam/io/gcp/spanner.py | 103 +++++++++++++++
5 files changed, 391 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bffa326c9ff..45f49db7a34 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -180,6 +180,7 @@
* [Python] Prism runner now auto-enabled for some Python pipelines using the
direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
* Python: Added JupyterLab 4.x extension compatibility for enhanced notebook
integration ([#34495](https://github.com/apache/beam/pull/34495)).
+* [Python] Adding GCP Spanner Change Stream support for Python
(apache_beam.io.gcp.spanner). ([#24103]
https://github.com/apache/beam/issues/24103).
## Breaking Changes
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index d3b2632bad0..163b6d67bb2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -52,6 +52,8 @@ import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -1015,6 +1017,32 @@ public class SpannerIO {
}
}
+ static class ChangeStreamRead extends PTransform<PBegin,
PCollection<String>> {
+
+ ReadChangeStream readChangeStream;
+
+ public ChangeStreamRead(ReadChangeStream readChangeStream) {
+ this.readChangeStream = readChangeStream;
+ }
+
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ return input
+ .apply(readChangeStream)
+ .apply("DataChangeRecordToStringJSON", ParDo.of(new
DataChangeRecordToJsonFn()));
+ }
+ }
+
+ private static class DataChangeRecordToJsonFn extends DoFn<DataChangeRecord,
String> {
+ private static Gson gson = new
GsonBuilder().disableHtmlEscaping().create();
+
+ @ProcessElement
+ public void process(@Element DataChangeRecord input,
OutputReceiver<String> receiver) {
+ String modJsonString = gson.toJson(input, DataChangeRecord.class);
+ receiver.output(modJsonString);
+ }
+ }
+
/**
* A {@link PTransform} that create a transaction. If applied to a {@link
PCollection}, it will
* create a transaction after the {@link PCollection} is closed.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 72b51beadb5..e7c2339f798 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -23,6 +23,7 @@ import static
com.google.cloud.spanner.TimestampBound.Mode.READ_TIMESTAMP;
import com.google.auto.service.AutoService;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.TimestampBound;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -43,8 +44,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
/**
- * Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an
external transform for
- * cross-language usage.
+ * Exposes {@link SpannerIO.WriteRows}, {@link SpannerIO.ReadRows} and {@link
+ * SpannerIO.ChangeStreamRead} as an external transform for cross-language
usage.
*/
@AutoService(ExternalTransformRegistrar.class)
public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
@@ -55,6 +56,8 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
"beam:transform:org.apache.beam:spanner_insert_or_update:v1";
public static final String DELETE_URN =
"beam:transform:org.apache.beam:spanner_delete:v1";
public static final String READ_URN =
"beam:transform:org.apache.beam:spanner_read:v1";
+ public static final String READ_CHANGE_STREAM_URN =
+ "beam:transform:org.apache.beam:spanner_change_stream_reader:v1";
@Override
@NonNull
@@ -66,6 +69,7 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
.put(INSERT_OR_UPDATE_URN, new InsertOrUpdateBuilder())
.put(DELETE_URN, new DeleteBuilder())
.put(READ_URN, new ReadBuilder())
+ .put(READ_CHANGE_STREAM_URN, new ChangeStreamReaderBuilder())
.build();
}
@@ -382,4 +386,113 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
return SpannerIO.WriteRows.of(writeTransform, operation,
configuration.table);
}
}
+
+ public static class ChangeStreamReaderBuilder
+ implements ExternalTransformBuilder<
+ ChangeStreamReaderBuilder.Configuration, PBegin,
PCollection<String>> {
+
+ public static class Configuration extends CrossLanguageConfiguration {
+ private String changeStreamName = "";
+ private String metadataDatabase = "";
+ private String metadataInstance = "";
+ private @Nullable Timestamp inclusiveStartAt;
+ private @Nullable Timestamp inclusiveEndAt;
+ private @Nullable String metadataTable;
+ private @Nullable RpcPriority rpcPriority;
+ private @Nullable Duration watermarkRefreshRate;
+
+ public void setChangeStreamName(String changeStreamName) {
+ this.changeStreamName = changeStreamName;
+ }
+
+ public void setInclusiveStartAt(@Nullable String inclusiveStartAtString)
{
+ if (inclusiveStartAtString != null) {
+ this.inclusiveStartAt =
Timestamp.parseTimestamp(inclusiveStartAtString);
+ }
+ }
+
+ public void setInclusiveEndAt(@Nullable String inclusiveEndAtString) {
+ if (inclusiveEndAtString != null) {
+ this.inclusiveEndAt = Timestamp.parseTimestamp(inclusiveEndAtString);
+ }
+ }
+
+ public void setMetadataDatabase(String metadataDatabase) {
+ this.metadataDatabase = metadataDatabase;
+ }
+
+ public void setMetadataInstance(String metadataInstance) {
+ this.metadataInstance = metadataInstance;
+ }
+
+ public void setMetadataTable(@Nullable String metadataTable) {
+ this.metadataTable = metadataTable;
+ }
+
+ public void setRpcPriority(@Nullable String rpcPriorityString) {
+ if (rpcPriorityString != null) {
+ this.rpcPriority = RpcPriority.valueOf(rpcPriorityString);
+ }
+ }
+
+ public void setWatermarkRefreshRate(@Nullable String
watermarkRefreshRateString) {
+ if (watermarkRefreshRateString != null) {
+ this.watermarkRefreshRate =
Duration.parse(watermarkRefreshRateString);
+ }
+ }
+ }
+
+ @Override
+ @NonNull
+ public PTransform<PBegin, PCollection<String>> buildExternal(
+ ChangeStreamReaderBuilder.Configuration configuration) {
+
+ configuration.checkMandatoryFields();
+
+ if (configuration.changeStreamName.isEmpty()) {
+ throw new IllegalArgumentException("ChangeStreamName can't be empty");
+ }
+
+ if (configuration.metadataInstance.isEmpty()) {
+ throw new IllegalArgumentException("MetadataInstance can't be empty");
+ }
+
+ if (configuration.metadataDatabase.isEmpty()) {
+ throw new IllegalArgumentException("MetadataDatabase can't be empty");
+ }
+
+ SpannerIO.ReadChangeStream readChangeStream =
+ SpannerIO.readChangeStream()
+ .withProjectId(configuration.projectId)
+ .withInstanceId(configuration.instanceId)
+ .withDatabaseId(configuration.databaseId)
+ .withChangeStreamName(configuration.changeStreamName)
+ .withMetadataInstance(configuration.metadataInstance)
+ .withMetadataDatabase(configuration.metadataDatabase);
+
+ if (configuration.inclusiveStartAt != null) {
+ readChangeStream =
readChangeStream.withInclusiveStartAt(configuration.inclusiveStartAt);
+ }
+
+ if (configuration.inclusiveEndAt != null) {
+ readChangeStream =
readChangeStream.withInclusiveEndAt(configuration.inclusiveEndAt);
+ }
+
+ if (configuration.metadataTable != null) {
+ readChangeStream =
readChangeStream.withMetadataTable(configuration.metadataTable);
+ }
+
+ if (configuration.rpcPriority != null) {
+
+ readChangeStream =
readChangeStream.withRpcPriority(configuration.rpcPriority);
+ }
+
+ if (configuration.watermarkRefreshRate != null) {
+ readChangeStream =
+
readChangeStream.withWatermarkRefreshRate(configuration.watermarkRefreshRate);
+ }
+
+ return new SpannerIO.ChangeStreamRead(readChangeStream);
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
index 3b38e7e528a..666cda91f73 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java
@@ -26,6 +26,7 @@ import com.google.cloud.spanner.TimestampBound;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ChangeStreamReaderBuilder;
import
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.InsertBuilder;
import
org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ReadBuilder;
import org.apache.beam.sdk.schemas.Schema;
@@ -48,22 +49,29 @@ public class SpannerTransformRegistrarTest {
public static final String SPANNER_PROJECT = "spanner-project";
public static final String SPANNER_TABLE = "spanner-table";
public static final String SPANNER_SQL_QUERY = "SELECT * from
spanner_table;";
+ public static final String SPANNER_CHANGE_STREAM_NAME =
"spanner-change-stream-name";
+ public static final String SPANNER_CHANGE_STREAM_METADATA_INSTANCE =
+ "spanner-change-stream-instance";
+ public static final String SPANNER_CHANGE_STREAM_METADATA_DATABASE =
+ "spanner-change-stream-database";
private SpannerTransformRegistrar spannerTransformRegistrar;
private ReadBuilder readBuilder;
private InsertBuilder writeBuilder;
+ private ChangeStreamReaderBuilder changeStreamReaderBuilder;
@Before
public void setup() {
spannerTransformRegistrar = new SpannerTransformRegistrar();
readBuilder = new ReadBuilder();
writeBuilder = new InsertBuilder();
+ changeStreamReaderBuilder = new ChangeStreamReaderBuilder();
}
@Test
public void testKnownBuilderInstances() {
Map<String, ExternalTransformBuilder<?, ?, ?>> builderInstancesMap =
spannerTransformRegistrar.knownBuilderInstances();
- assertEquals(6, builderInstancesMap.size());
+ assertEquals(7, builderInstancesMap.size());
assertThat(builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_URN));
assertThat(builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.UPDATE_URN));
assertThat(builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.REPLACE_URN));
@@ -72,6 +80,9 @@ public class SpannerTransformRegistrarTest {
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_OR_UPDATE_URN));
assertThat(builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.DELETE_URN));
assertThat(builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_URN));
+ assertThat(
+ builderInstancesMap,
+
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_CHANGE_STREAM_URN));
}
@Test(expected = IllegalArgumentException.class)
@@ -207,4 +218,136 @@ public class SpannerTransformRegistrarTest {
configuration.setMaxCumulativeBackoff(100L);
return configuration;
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingMandatoryFields() {
+ changeStreamReaderBuilder.buildExternal(new
ChangeStreamReaderBuilder.Configuration());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingDatabaseId() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ changeStreamReaderBuilder.buildExternal(configuration);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingInstanceId() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ changeStreamReaderBuilder.buildExternal(configuration);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingChangeStreamName() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ changeStreamReaderBuilder.buildExternal(configuration);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingMetadataInstance() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ changeStreamReaderBuilder.buildExternal(configuration);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
testChangeStreamReaderBuilderBuildExternalWithMissingMetadataDatabase() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ changeStreamReaderBuilder.buildExternal(configuration);
+ }
+
+ @Test
+ public void testChangeStreamReaderBuilderBuildExternalWithRequiredFields() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+
+ PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+ changeStreamReaderBuilder.buildExternal(configuration);
+ assertNotNull(changeStreamReaderTransform);
+ }
+
+ @Test
+ public void testChangeStreamReaderBuilderBuildExternalWithAllFields() {
+ String startAt = "2023-01-01T00:00:00Z";
+ String endAt = "2023-01-02T00:00:00Z";
+ String metadataTable = "meta-table";
+ String rpcPriority = "HIGH";
+ String refreshRate = "PT30S";
+
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ configuration.setInclusiveStartAt(startAt);
+ configuration.setInclusiveEndAt(endAt);
+ configuration.setMetadataTable(metadataTable);
+ configuration.setRpcPriority(rpcPriority);
+ configuration.setWatermarkRefreshRate(refreshRate);
+
+ PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+ changeStreamReaderBuilder.buildExternal(configuration);
+ assertNotNull(changeStreamReaderTransform);
+ }
+
+ @Test
+ public void
testChangeStreamReaderBuilderBuildExternalWithNullOptionalValues() {
+ ChangeStreamReaderBuilder.Configuration configuration =
+ new ChangeStreamReaderBuilder.Configuration();
+
+ configuration.setProjectId(SPANNER_PROJECT);
+ configuration.setDatabaseId(SPANNER_DATABASE);
+ configuration.setInstanceId(SPANNER_INSTANCE);
+ configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
+ configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
+ configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
+ configuration.setInclusiveStartAt(null);
+ configuration.setInclusiveEndAt(null);
+ configuration.setMetadataTable(null);
+ configuration.setRpcPriority(null);
+ configuration.setWatermarkRefreshRate(null);
+
+ PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
+ changeStreamReaderBuilder.buildExternal(configuration);
+ assertNotNull(changeStreamReaderTransform);
+ }
}
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py
b/sdks/python/apache_beam/io/gcp/spanner.py
index 9089d746fe1..03ad91069b9 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -145,6 +145,20 @@ class ReadFromSpannerSchema(NamedTuple):
time_unit: Optional[str]
+class ReadChangeStreamFromSpannerSchema(NamedTuple):
+ instance_id: str
+ database_id: str
+ project_id: str
+ changeStreamName: str
+ inclusiveStartAt: str
+ inclusiveEndAt: Optional[str]
+ metadataDatabase: str
+ metadataInstance: str
+ metadataTable: Optional[str]
+ rpcPriority: Optional[str]
+ watermarkRefreshRate: Optional[str]
+
+
class ReadFromSpanner(ExternalTransform):
"""
A PTransform which reads from the specified Spanner instance's database.
@@ -659,5 +673,94 @@ class SpannerUpdate(ExternalTransform):
)
+class ReadChangeStreamFromSpanner(ExternalTransform):
+ """
+ A PTransform to read Change Streams from Google Cloud Spanner.
+
+ The output of this transform is a PCollection of JSON strings,
+ where each string represents a com.google.cloud.spanner.DataChangeRecord.
+
+ Example:
+
+ with beam.Pipeline(options=pipeline_options) as p:
+ p |
+ "ReadFromSpannerChangeStream" >> beam_spanner.ReadChangeStreamFromSpanner(
+ project_id="spanner-project-id",
+ instance_id="spanner-instance-id",
+ database_id="spanner-database-id",
+ changeStreamName="spanner-change-stream",
+ inclusiveStartAt="2025-05-20T10:00:00Z",
+ metadataDatabase="spanner-metadata-database",
+ metadataInstance="spanner-metadata-instance")
+
+ Experimental; no backwards compatibility guarantees.
+ """
+
+ URN = 'beam:transform:org.apache.beam:spanner_change_stream_reader:v1'
+
+ def __init__(
+ self,
+ project_id,
+ instance_id,
+ database_id,
+ changeStreamName,
+ metadataDatabase,
+ metadataInstance,
+ inclusiveStartAt,
+ inclusiveEndAt=None,
+ metadataTable=None,
+ rpcPriority=None,
+ watermarkRefreshRate=None,
+ expansion_service=None,
+ ):
+ """
+ Reads Change Streams from Google Cloud Spanner.
+
+ :param project_id: (Required) Specifies the Cloud Spanner project.
+ :param instance_id: (Required) Specifies the Cloud Spanner
+ instance.
+ :param database_id: (Required) Specifies the Cloud Spanner
+ database.
+ :param changeStreamName: (Required) The name of the Spanner
+ change stream to read.
+ :param metadataDatabase: (Required) The database where the
+ change stream metadata is stored.
+ :param metadataInstance: (Required) The instance where the
+ change stream metadata database resides.
+ :param inclusiveStartAt: (Required) An inclusive start timestamp
+ for reading the change stream.
+ :param inclusiveEndAt: (Optional) An inclusive end timestamp for
+ reading the change stream. If not specified, the stream will be
+ read indefinitely.
+ :param metadataTable: (Optional) The name of the metadata table used
+ by the change stream connector. If not specified, a default table
+ name will be used.
+ :param rpcPriority: (Optional) The RPC priority for Spanner
operations.
+ Can be 'HIGH', 'MEDIUM', or 'LOW'.
+ :param watermarkRefreshRate: (Optional) The duration at which the
+ watermark is refreshed.
+ """
+
+ super().__init__(
+ self.URN,
+ NamedTupleBasedPayloadBuilder(
+ ReadChangeStreamFromSpannerSchema(
+ instance_id=instance_id,
+ database_id=database_id,
+ project_id=project_id,
+ changeStreamName=changeStreamName,
+ inclusiveStartAt=inclusiveStartAt,
+ inclusiveEndAt=inclusiveEndAt,
+ metadataDatabase=metadataDatabase,
+ metadataInstance=metadataInstance,
+ metadataTable=metadataTable,
+ rpcPriority=rpcPriority,
+ watermarkRefreshRate=watermarkRefreshRate,
+ ),
+ ),
+ expansion_service=expansion_service or default_io_expansion_service(),
+ )
+
+
def _get_enum_name(enum):
return None if enum is None else enum.name