Polber commented on code in PR #31987:
URL: https://github.com/apache/beam/pull/31987#discussion_r1691868652
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+ static class SpannerSchemaTransformRead extends SchemaTransform implements
Serializable {
+ private final SpannerReadSchemaTransformConfiguration configuration;
+
+ SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be
null.");
+ PCollection<Struct> spannerRows = null;
+
+ if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withQuery(configuration.getQuery())
+ );
+ }
+ else {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withTable(configuration.getTableId())
+ .withColumns(configuration.getColumns())
+ );
+ }
Review Comment:
the `Configuration.validate()` method already does checks to ensure correct
combo of parameters are specified, so you can just chain the specified ones
together and reduce code-duplication.
i.e.
```
SpannerIO.Read read = SpannerIO.readWithSchema();
if (!Strings.isNullOrEmpty(configuration.getProjectId())) {
read = read.withProjectId(configuration.getProjectId());
}
if (!Strings.isNullOrEmpty(configuration.getInstanceId()) {
read = read.withInstanceId(configuration.getInstanceId())
}
...
PCollection<Struct> spannerRows = input.getPipeline().apply(read);
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+ static class SpannerSchemaTransformRead extends SchemaTransform implements
Serializable {
+ private final SpannerReadSchemaTransformConfiguration configuration;
+
+ SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be
null.");
+ PCollection<Struct> spannerRows = null;
+
+ if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withQuery(configuration.getQuery())
+ );
+ }
+ else {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withTable(configuration.getTableId())
+ .withColumns(configuration.getColumns())
+ );
+ }
+
+ // Hardcoded for testing
+ /*
+ Schema schema = Schema.builder()
+ .addField("id_column", Schema.FieldType.INT64)
+ .addField("name_column", Schema.FieldType.STRING)
+ .build();
+ */
+ // Implement when getSchema() is available
Review Comment:
Please remove
```suggestion
```
##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -257,3 +257,29 @@
'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
config:
gradle_target:
'sdks:java:extensions:schemaio-expansion-service:shadowJar'
+
+- type: renaming
+ transforms:
+ 'ReadFromSpanner': 'ReadFromSpanner'
+ 'WriteToSpanner': 'WriteToSpanner'
+ config:
+ mappings:
+ 'ReadFromSpanner':
+ project_id: 'project_id'
+ instance_id: 'instance_id'
+ database_id: 'database_id'
+ table_id: 'table_id'
+ query: 'query'
+ columns: 'columns'
+ 'WriteToSpanner':
+ instance_id: 'instance_id'
+ database_id: 'database_id'
+ table_id: 'table_id'
Review Comment:
I would strip the `_id` from all of these parameters on the YAML side
i.e.
```suggestion
project: 'project_id'
instance: 'instance_id'
database: 'database_id'
table: 'table_id'
query: 'query'
columns: 'columns'
'WriteToSpanner':
instance: 'instance_id'
database: 'database_id'
table: 'table_id'
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
outputCollectionNames() {
- return Arrays.asList("failures", "errors");
+ return Collections.singletonList("post-write");
}
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SpannerWriteSchemaTransformConfiguration
implements Serializable {
+ @SchemaFieldDescription("Specifies the GCP project.")
+ @Nullable
+ public abstract String getProjectId();
+
@SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+ @Nullable
public abstract String getInstanceId();
@SchemaFieldDescription("Specifies the Cloud Spanner database.")
+ @Nullable
public abstract String getDatabaseId();
@SchemaFieldDescription("Specifies the Cloud Spanner table.")
+ @Nullable
public abstract String getTableId();
Review Comment:
I would remove `@Nullable` tag if these are all required. I'm not sure if
project is required, but maybe check on that one too.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
Objects.requireNonNull(row))) {}))
.apply(
SpannerIO.write()
+ .withProjectId(configuration.getProjectId())
.withDatabaseId(configuration.getDatabaseId())
.withInstanceId(configuration.getInstanceId())
- .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+ .withFailureMode(failureMode));
Review Comment:
nit: can inline from above
```suggestion
.withFailureMode(handleErrors ?
FailureMode.REPORT_FAILURES : FailureMode.FAIL_FAST));
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java:
##########
@@ -351,4 +355,63 @@ private static void addIterableToMutationBuilder(
beamIterableType.getTypeName()));
}
}
+ public static Row createRowFromMutation(Schema schema, Mutation mutation) {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ Iterable<Value> values = mutation.getValues();
+ Iterator<Value> valuesItr = values.iterator();
+
+ while (valuesItr.hasNext()) {
+ Value value = valuesItr.next();
+ rowBuilder.addValue(convertValueToBeamFieldType(value));
+ }
Review Comment:
I'm worried the ordering of the mutation fields iterator isn't guaranteed to
be the same as the initial row that was written.
`Mutation` has a built-in method `asMap()` that returns a map `{columnName:
value}` that you could loop over and use `rowBuilder.withFieldValue(column,
value)` instead
Something like
```
mutation.asMap().forEach(
(column, value) -> rowBuilder.withFieldValue(column,
getMutationValue(value)));
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
Objects.requireNonNull(row))) {}))
.apply(
SpannerIO.write()
+ .withProjectId(configuration.getProjectId())
.withDatabaseId(configuration.getDatabaseId())
.withInstanceId(configuration.getInstanceId())
- .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+ .withFailureMode(failureMode));
+
+ PCollection<Row> postWrite =
+ result
+ .getFailedMutations()
+ .apply("post-write", ParDo.of(new
NoOutputDoFn<MutationGroup>()))
+ .setRowSchema(Schema.of());
+
+ if (!handleErrors)
+ return PCollectionRowTuple.of("post-write", postWrite);
+
+ Schema inputSchema = input.get("input").getSchema();
Schema failureSchema =
- Schema.builder()
- .addStringField("operation")
- .addStringField("instanceId")
- .addStringField("databaseId")
- .addStringField("tableId")
- .addStringField("mutationData")
- .build();
+ Schema.of(
+ Field.of("error_message", FieldType.STRING),
+ Field.of("failed_row", FieldType.row(inputSchema)));
Review Comment:
```suggestion
ErrorHandling.errorSchema(inputSchema);
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+ static class SpannerSchemaTransformRead extends SchemaTransform implements
Serializable {
+ private final SpannerReadSchemaTransformConfiguration configuration;
+
+ SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be
null.");
+ PCollection<Struct> spannerRows = null;
+
+ if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withQuery(configuration.getQuery())
+ );
+ }
+ else {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withTable(configuration.getTableId())
+ .withColumns(configuration.getColumns())
+ );
+ }
+
+ // Hardcoded for testing
+ /*
+ Schema schema = Schema.builder()
+ .addField("id_column", Schema.FieldType.INT64)
+ .addField("name_column", Schema.FieldType.STRING)
+ .build();
+ */
+ // Implement when getSchema() is available
+ Schema schema = spannerRows.getSchema();
+ PCollection<Row> rows =
spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class))
+ .via((Struct struct) -> StructUtils.structToBeamRow(struct,
schema)));
+
+ return PCollectionRowTuple.of("output", rows.setRowSchema(schema));
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:spanner_read:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("output");
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class SpannerReadSchemaTransformConfiguration
implements Serializable {
+ @AutoValue.Builder
+ @Nullable
+ public abstract static class Builder {
+ public abstract Builder setProjectId(String projectId);
+ public abstract Builder setInstanceId(String instanceId);
+ public abstract Builder setDatabaseId(String databaseId);
+ public abstract Builder setTableId(String tableId);
+ public abstract Builder setQuery(String query);
+ public abstract Builder setColumns(List<String> columns);
+ public abstract SpannerReadSchemaTransformConfiguration build();
+ }
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid Cloud Spanner Read configuration:
";
+ if (!Strings.isNullOrEmpty(this.getQuery())) {
+ checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID
must be specified for SQL query.");
+ checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID
must be specified for SQL query.");
+ checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID
must be specified for SQL query.");
+ }
+ else {
+ checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID
must be specified for table read.");
+ checkNotNull(this.getTableId(), invalidConfigMessage + "Table name
must be specified for table read.");
+ checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID
must be specified for table read.");
+ checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID
must be specified for table read.");
+ checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must
be specified for table read.");
+ }
Review Comment:
This logic can be simplified. For example, several of the checks are
duplicated.
Also add check for mutual exclusivity between `query` and `tableId`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
outputCollectionNames() {
- return Arrays.asList("failures", "errors");
+ return Collections.singletonList("post-write");
}
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SpannerWriteSchemaTransformConfiguration
implements Serializable {
+ @SchemaFieldDescription("Specifies the GCP project.")
+ @Nullable
+ public abstract String getProjectId();
+
@SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+ @Nullable
public abstract String getInstanceId();
@SchemaFieldDescription("Specifies the Cloud Spanner database.")
+ @Nullable
public abstract String getDatabaseId();
@SchemaFieldDescription("Specifies the Cloud Spanner table.")
+ @Nullable
public abstract String getTableId();
+ @SchemaFieldDescription("Specifies how to handle errors.")
+ @Nullable
+ public abstract ErrorHandling getErrorHandling();
+
public static Builder builder() {
return new
AutoValue_SpannerWriteSchemaTransformProvider_SpannerWriteSchemaTransformConfiguration
.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
+ public abstract Builder setProjectId(String projectId);
+
public abstract Builder setInstanceId(String instanceId);
public abstract Builder setDatabaseId(String databaseId);
public abstract Builder setTableId(String tableId);
+ public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
public abstract SpannerWriteSchemaTransformConfiguration build();
}
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid Spanner Write configuration: ";
+
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getProjectId()),
+ invalidConfigMessage + "Project ID for a Spanner Write must be
specified.");
+
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getInstanceId()),
+ invalidConfigMessage + "Instance ID for a Spanner Write must be
specified.");
+
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getDatabaseId()),
+ invalidConfigMessage + "Database ID for a Spanner Write must be
specified.");
+
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getTableId()),
+ invalidConfigMessage + "Table ID for a Spanner Write must be
specified.");
+
+ if (this.getErrorHandling() != null) {
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
+ invalidConfigMessage + "Output must not be empty if error handling
specified.");
+ }
Review Comment:
I would remove this check. We already check for error output in the
transform itself, and the YAML side checks that the error_handling tag has
output
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
Objects.requireNonNull(row))) {}))
.apply(
SpannerIO.write()
+ .withProjectId(configuration.getProjectId())
.withDatabaseId(configuration.getDatabaseId())
.withInstanceId(configuration.getInstanceId())
- .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+ .withFailureMode(failureMode));
+
+ PCollection<Row> postWrite =
+ result
+ .getFailedMutations()
+ .apply("post-write", ParDo.of(new
NoOutputDoFn<MutationGroup>()))
+ .setRowSchema(Schema.of());
+
+ if (!handleErrors)
+ return PCollectionRowTuple.of("post-write", postWrite);
+
+ Schema inputSchema = input.get("input").getSchema();
Schema failureSchema =
- Schema.builder()
- .addStringField("operation")
- .addStringField("instanceId")
- .addStringField("databaseId")
- .addStringField("tableId")
- .addStringField("mutationData")
- .build();
+ Schema.of(
+ Field.of("error_message", FieldType.STRING),
+ Field.of("failed_row", FieldType.row(inputSchema)));
+
PCollection<Row> failures =
result
.getFailedMutations()
.apply(
FlatMapElements.into(TypeDescriptors.rows())
.via(
mtg ->
- Objects.requireNonNull(mtg).attached().stream()
+
StreamSupport.stream(Objects.requireNonNull(mtg).spliterator(), false)
.map(
mutation ->
Row.withSchema(failureSchema)
-
.addValue(mutation.getOperation().toString())
-
.addValue(configuration.getInstanceId())
-
.addValue(configuration.getDatabaseId())
- .addValue(mutation.getTable())
- // TODO(pabloem): Figure out how
to represent
- // mutation
- // contents in DLQ
- .addValue(
- Iterators.toString(
-
mutation.getValues().iterator()))
- .build())
- .collect(Collectors.toList())))
+ .withFieldValue("error_message",
String.format("%s operation failed at instance: %s, database: %s, table: %s",
+
mutation.getOperation().toString(), configuration.getInstanceId(),
configuration.getDatabaseId(), mutation.getTable()))
Review Comment:
nit: should be implicit
```suggestion
mutation.getOperation(),
configuration.getInstanceId(), configuration.getDatabaseId(),
mutation.getTable()))
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+ static class SpannerSchemaTransformRead extends SchemaTransform implements
Serializable {
+ private final SpannerReadSchemaTransformConfiguration configuration;
+
+ SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be
null.");
+ PCollection<Struct> spannerRows = null;
+
+ if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withQuery(configuration.getQuery())
+ );
+ }
+ else {
+ spannerRows = input.getPipeline().apply(
+ SpannerIO.readWithSchema()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId())
+ .withTable(configuration.getTableId())
+ .withColumns(configuration.getColumns())
+ );
+ }
+
+ // Hardcoded for testing
+ /*
+ Schema schema = Schema.builder()
+ .addField("id_column", Schema.FieldType.INT64)
+ .addField("name_column", Schema.FieldType.STRING)
+ .build();
+ */
+ // Implement when getSchema() is available
+ Schema schema = spannerRows.getSchema();
+ PCollection<Row> rows =
spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class))
+ .via((Struct struct) -> StructUtils.structToBeamRow(struct,
schema)));
+
+ return PCollectionRowTuple.of("output", rows.setRowSchema(schema));
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:spanner_read:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("output");
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class SpannerReadSchemaTransformConfiguration
implements Serializable {
+ @AutoValue.Builder
+ @Nullable
+ public abstract static class Builder {
+ public abstract Builder setProjectId(String projectId);
+ public abstract Builder setInstanceId(String instanceId);
+ public abstract Builder setDatabaseId(String databaseId);
+ public abstract Builder setTableId(String tableId);
+ public abstract Builder setQuery(String query);
+ public abstract Builder setColumns(List<String> columns);
+ public abstract SpannerReadSchemaTransformConfiguration build();
+ }
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid Cloud Spanner Read configuration:
";
+ if (!Strings.isNullOrEmpty(this.getQuery())) {
+ checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID
must be specified for SQL query.");
+ checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID
must be specified for SQL query.");
+ checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID
must be specified for SQL query.");
+ }
+ else {
+ checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID
must be specified for table read.");
+ checkNotNull(this.getTableId(), invalidConfigMessage + "Table name
must be specified for table read.");
+ checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID
must be specified for table read.");
+ checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID
must be specified for table read.");
+ checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must
be specified for table read.");
+ }
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_SpannerReadSchemaTransformProvider_SpannerReadSchemaTransformConfiguration
+ .Builder();
+ }
+ @SchemaFieldDescription("Specifies the GCP project ID.")
+ @Nullable
+ public abstract String getProjectId();
+
+ @SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+ @Nullable
+ public abstract String getInstanceId();
+
+ @SchemaFieldDescription("Specifies the Cloud Spanner database.")
+ @Nullable
+ public abstract String getDatabaseId();
Review Comment:
I think both of these are required in all cases, so maybe remove the
`@Nullable` tags and any others that are required if I missed any
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull
PCollectionRowTuple input) {
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
outputCollectionNames() {
- return Arrays.asList("failures", "errors");
+ return Collections.singletonList("post-write");
Review Comment:
I don't think this method is relevant anymore, but just for clarity, let's
add the "errors" tag to express that the output expects to have 2 tags
```suggestion
Arrays.asList("post-write", "errors");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]