gemini-code-assist[bot] commented on code in PR #35794:
URL: https://github.com/apache/beam/pull/35794#discussion_r2258046488
##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -375,23 +375,47 @@
#BigTable
- type: renaming
transforms:
- #'ReadFromBigTable': 'ReadFromBigTable'
+ 'ReadFromBigTable': 'ReadFromBigTable'
'WriteToBigTable': 'WriteToBigTable'
config:
mappings:
#Temp removing read from bigTable IO
-# 'ReadFromBigTable':
-# project: 'project_id'
-# instance: 'instance_id'
-# table: 'table_id'
+ 'ReadFromBigTable':
+ project: 'project_id'
+ instance: 'instance_id'
+ table: 'table_id'
+ flatten: "flatten"
'WriteToBigTable':
project: 'project_id'
instance: 'instance_id'
table: 'table_id'
underlying_provider:
type: beamJar
transforms:
-# 'ReadFromBigTable':
'beam:schematransform:org.apache.beam:bigtable_read:v1'
+ 'ReadFromBigTable':
'beam:schematransform:org.apache.beam:bigtable_read:v1'
'WriteToBigTable':
'beam:schematransform:org.apache.beam:bigtable_write:v1'
config:
gradle_target:
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+
+#MongoDB
+- type: renaming
+ transforms:
+ 'ReadFromMongoDB': 'ReadFromMongoDB'
+ 'WriteToMongoDB': 'WriteToMongoDB'
+ config:
+ mappings:
+ 'ReadFromMongoDB':
+ connection_uri: "connection_uri"
+ database: "database"
+ collection: "collection"
+ 'WriteToMongoDB':
+ connection_uri: "connection_uri"
+ database: "database"
+ collection: "collection"
+ underlying_provider:
+ type: beamJar
+ transforms:
+ 'ReadFromBigTable':
'beam:schematransform:org.apache.beam:mongodb_read:v1'
+ 'WriteToBigTable':
'beam:schematransform:org.apache.beam:mongodb_write:v1'
Review Comment:

There seems to be a copy-paste error here. The transform keys for MongoDB
are incorrectly named `ReadFromBigTable` and `WriteToBigTable`. They should be
`ReadFromMongoDB` and `WriteToMongoDB` to match the transform names defined in
the `transforms` section.
```
'ReadFromMongoDB':
'beam:schematransform:org.apache.beam:mongodb_read:v1'
'WriteToMongoDB':
'beam:schematransform:org.apache.beam:mongodb_write:v1'
```
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for writing to
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {
+
+ private static final String INPUT_TAG = "input";
+
+ @Override
+ protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration
configuration) {
+ return new MongoDbWriteSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mongodb_write:v1";
+ }
+
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /** Configuration class for the MongoDB Write transform. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class MongoDbWriteSchemaTransformConfiguration
implements Serializable {
+
+ @SchemaFieldDescription("The connection URI for the MongoDB server.")
+ public abstract String getUri();
+
+ @SchemaFieldDescription("The MongoDB database to write to.")
+ public abstract String getDatabase();
+
+ @SchemaFieldDescription("The MongoDB collection to write to.")
+ public abstract String getCollection();
+
+ // @SchemaFieldDescription("The number of documents to include in each
batch write.")
+ // @Nullable
+ // public abstract Long getBatchSize();
+ //
+ // @SchemaFieldDescription("Whether the writes should be performed in
an ordered manner.")
+ // @Nullable
+ // public abstract Boolean getOrdered();
+
+ public void validate() {
+ checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must
be specified.");
+ checkArgument(
+ getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database
must be specified.");
+ checkArgument(
+ getCollection() != null && !getCollection().isEmpty(),
+ "MongoDB collection must be specified.");
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUri(String uri);
+
+ public abstract Builder setDatabase(String database);
+
+ public abstract Builder setCollection(String collection);
+
+ public abstract Builder setBatchSize(Long batchSize);
+
+ public abstract Builder setOrdered(Boolean ordered);
Review Comment:

The builder includes `setBatchSize` and `setOrdered` methods, but the
corresponding getters in `MongoDbWriteSchemaTransformConfiguration` and their
usage in `MongoDbWriteSchemaTransform` are commented out. This is inconsistent.
If these features are not yet supported, these methods should be removed from
the builder to avoid confusion.
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for reading from
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> {
+
+ private static final String OUTPUT_TAG = "output";
+
+ @Override
+ protected Class<MongoDbReadSchemaTransformConfiguration>
configurationClass() {
+ return MongoDbReadSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration
configuration) {
+ return new MongoDbReadSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ // Return a unique URN for the transform.
+ return "beam:schematransform:org.apache.beam:mongodb_read:v1";
+ }
+
+ @Override
+ public List<String> inputCollectionNames() {
+ // A read transform does not have an input PCollection.
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> outputCollectionNames() {
+ // The primary output is a PCollection of Rows.
+ // Error handling could be added later with a second "errors" output tag.
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /** Configuration class for the MongoDB Read transform. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class MongoDbReadSchemaTransformConfiguration
implements Serializable {
+
+ @SchemaFieldDescription("The connection URI for the MongoDB server.")
+ public abstract String getUri();
+
+ @SchemaFieldDescription("The MongoDB database to read from.")
+ public abstract String getDatabase();
+
+ @SchemaFieldDescription("The MongoDB collection to read from.")
+ public abstract String getCollection();
+
+ @SchemaFieldDescription(
+ "An optional BSON filter to apply to the read. This should be a valid
JSON string.")
+ @Nullable
+ public abstract String getFilter();
+
+ public void validate() {
+ checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must
be specified.");
+ checkArgument(
+ getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database
must be specified.");
+ checkArgument(
+ getCollection() != null && !getCollection().isEmpty(),
+ "MongoDB collection must be specified.");
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration
+ .Builder();
+ }
+
+ /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUri(String uri);
+
+ public abstract Builder setDatabase(String database);
+
+ public abstract Builder setCollection(String collection);
+
+ public abstract Builder setFilter(String filter);
+
+ public abstract MongoDbReadSchemaTransformConfiguration build();
+ }
+ }
+
+ /** The {@link SchemaTransform} that performs the read operation. */
+ private static class MongoDbReadSchemaTransform extends SchemaTransform {
+ private final MongoDbReadSchemaTransformConfiguration configuration;
+
+ MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ // A read transform does not have an input, so we start with the
pipeline.
+ PCollection<Document> mongoDocs =
+ input
+ .getPipeline()
+ .apply(
+ "ReadFromMongoDb",
+ MongoDbIO.read()
+ .withUri(configuration.getUri())
+ .withDatabase(configuration.getDatabase())
+ .withCollection(configuration.getCollection()));
+ // TODO: Add support for .withFilter() if it exists in your MongoDbIO,
+ // using configuration.getFilter().
+
+ // Convert the BSON Document objects into Beam Row objects.
+ PCollection<Row> beamRows =
+ mongoDocs.apply("ConvertToBeamRows", ParDo.of(new
MongoDocumentToRowFn()));
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
+ }
+ }
+
+ /**
+ * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link
Row}.
+ *
+ * <p>This is a critical step to ensure data is in a schema-aware format.
+ */
+ private static class MongoDocumentToRowFn extends DoFn<Document, Row> {
+ // TODO: Define the Beam Schema that corresponds to your MongoDB documents.
+ // This could be made dynamic based on an inferred schema or a
user-provided schema.
+ // For this skeleton, we assume a static schema.
+ // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build();
+
+ @ProcessElement
+ public void processElement(@Element Document doc, OutputReceiver<Row> out)
{
+ // Here you will convert the BSON document to a Beam Row.
+ // This requires you to know the target schema.
+
+ // Example pseudo-code:
+ // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA);
+ // for (Map.Entry<String, Object> entry : doc.entrySet()) {
+ // rowBuilder.addValue(entry.getValue());
+ // }
+ // out.output(rowBuilder.build());
+
+ // For a robust implementation, you would handle data type conversions
+ // between BSON types and Beam schema types.
+ throw new UnsupportedOperationException(
+ "MongoDocumentToRowFn must be implemented to convert MongoDB
Documents to Beam Rows.");
+ }
Review Comment:

The `MongoDocumentToRowFn` is not implemented and throws an
`UnsupportedOperationException`. While this is a draft PR, this makes the
transform unusable. This `DoFn` is critical for converting MongoDB documents to
Beam Rows and needs to be implemented.
##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -675,11 +676,15 @@ def __hash__(self):
return hash(self.__dict__.items())
def __eq__(self, other):
+ if type(self) == type(other):
+ other_dict = other.__dict__
+ elif type(other) == type(NamedTuple):
+ other_dict = other._asdict()
Review Comment:

The check `type(other) == type(NamedTuple)` is incorrect for identifying an
instance of a `NamedTuple`. `NamedTuple` is a factory function, not a class
type that instances will have. An instance of a `NamedTuple` is a subclass of
`tuple`. A better way to check if `other` is a `NamedTuple`-like object is to
check if it's a tuple and has the `_asdict` method.
```suggestion
elif isinstance(other, tuple) and hasattr(other, '_asdict'):
other_dict = other._asdict()
```
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for writing to
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {
+
+ private static final String INPUT_TAG = "input";
+
+ @Override
+ protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration
configuration) {
+ return new MongoDbWriteSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mongodb_write:v1";
+ }
+
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /** Configuration class for the MongoDB Write transform. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class MongoDbWriteSchemaTransformConfiguration
implements Serializable {
+
+ @SchemaFieldDescription("The connection URI for the MongoDB server.")
+ public abstract String getUri();
+
+ @SchemaFieldDescription("The MongoDB database to write to.")
+ public abstract String getDatabase();
+
+ @SchemaFieldDescription("The MongoDB collection to write to.")
+ public abstract String getCollection();
+
+ // @SchemaFieldDescription("The number of documents to include in each
batch write.")
+ // @Nullable
+ // public abstract Long getBatchSize();
+ //
+ // @SchemaFieldDescription("Whether the writes should be performed in
an ordered manner.")
+ // @Nullable
+ // public abstract Boolean getOrdered();
+
+ public void validate() {
+ checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must
be specified.");
+ checkArgument(
+ getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database
must be specified.");
+ checkArgument(
+ getCollection() != null && !getCollection().isEmpty(),
+ "MongoDB collection must be specified.");
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUri(String uri);
+
+ public abstract Builder setDatabase(String database);
+
+ public abstract Builder setCollection(String collection);
+
+ public abstract Builder setBatchSize(Long batchSize);
+
+ public abstract Builder setOrdered(Boolean ordered);
+
+ public abstract MongoDbWriteSchemaTransformConfiguration build();
+ }
+ }
+
+ /** The {@link SchemaTransform} that performs the write operation. */
+ private static class MongoDbWriteSchemaTransform extends SchemaTransform {
+ private final MongoDbWriteSchemaTransformConfiguration configuration;
+
+ MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection<Row> rows = input.get(INPUT_TAG);
+
+ PCollection<Document> documents =
+ rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn()));
+
+ MongoDbIO.Write write =
+ MongoDbIO.write()
+ .withUri(configuration.getUri())
+ .withDatabase(configuration.getDatabase())
+ .withCollection(configuration.getCollection());
+
+ // if (configuration.getBatchSize() != null) {
+ // write = write.withBatchSize(configuration.getBatchSize());
+ // }
+ // if (configuration.getOrdered() != null) {
+ // write = write.withOrdered(configuration.getOrdered());
+ // }
+
+ documents.apply("WriteToMongo", write);
+
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+ }
+
+ /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link
Document}. */
+ private static class RowToMongoDocumentFn extends DoFn<Row, Document> {
+ @ProcessElement
+ public void processElement(@Element Row row, OutputReceiver<Document> out)
{
+ Document doc = new Document();
+ for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
+ String fieldName = row.getSchema().getField(i).getName();
+ Object value = row.getValue(i);
+
+ if (value != null) {
+ doc.append(fieldName, value);
+ }
+ }
+ out.output(doc);
+ }
+ }
Review Comment:

The `RowToMongoDocumentFn` class appears to be unused. The `expand` method
uses `RowToBsonDocumentFn` for the conversion. This class should be removed to
avoid dead code.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
+ Schema outputSchema =
+ Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA :
FLATTENED_ROW_SCHEMA;
+
PCollection<Row> beamRows =
- bigtableRows.apply(MapElements.via(new
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+ bigtableRows
+ .apply("ConvertToBeamRows", ParDo.of(new
BigtableRowConverterDoFn(configuration)))
+ .setRowSchema(outputSchema);
return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
}
}
- public static class BigtableRowToBeamRow extends
SimpleFunction<com.google.bigtable.v2.Row, Row> {
- @Override
- public Row apply(com.google.bigtable.v2.Row bigtableRow) {
- // The collection of families is represented as a Map of column families.
- // Each column family is represented as a Map of columns.
- // Each column is represented as a List of cells
- // Each cell is represented as a Beam Row consisting of value and
timestamp_micros
- Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
- for (Family fam : bigtableRow.getFamiliesList()) {
- // Map of column qualifier to list of cells
- Map<String, List<Row>> columns = new HashMap<>();
- for (Column col : fam.getColumnsList()) {
- List<Row> cells = new ArrayList<>();
- for (Cell cell : col.getCellsList()) {
- Row cellRow =
- Row.withSchema(CELL_SCHEMA)
- .withFieldValue("value",
ByteBuffer.wrap(cell.getValue().toByteArray()))
- .withFieldValue("timestamp_micros",
cell.getTimestampMicros())
+ /**
+ * A {@link DoFn} that converts a Bigtable {@link
com.google.bigtable.v2.Row} to a Beam {@link
+ * Row}. It supports both a nested representation and a flattened
representation where each column
+ * becomes a separate output element.
+ */
+ private static class BigtableRowConverterDoFn extends
DoFn<com.google.bigtable.v2.Row, Row> {
+ private final BigtableReadSchemaTransformConfiguration configuration;
+
+ BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
+ }
+
+ private List<Row> convertCells(List<Cell> bigtableCells) {
+ List<Row> beamCells = new ArrayList<>();
+ for (Cell cell : bigtableCells) {
+ Row cellRow =
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", cell.getValue().toByteArray())
+ .withFieldValue("timestamp_micros", cell.getTimestampMicros())
+ .build();
+ beamCells.add(cellRow);
+ }
+ return beamCells;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row>
out) {
+ // The builder defaults flatten to true. We check for an explicit false
setting to disable it.
+
+ if (Boolean.FALSE.equals(configuration.getFlatten())) {
+ // Non-flattening logic (original behavior): one output row per
Bigtable row.
+ Map<String, Map<ByteString, List<Row>>> families = new HashMap<>();
+ for (Family fam : bigtableRow.getFamiliesList()) {
+ Map<ByteString, List<Row>> columns = new HashMap<>();
+ for (Column col : fam.getColumnsList()) {
+
+ List<Cell> bigTableCells = col.getCellsList();
+
+ List<Row> cells = convertCells(bigTableCells);
+
+ columns.put(col.getQualifier(), cells);
+ }
+ families.put(fam.getName(), columns);
+ }
+ Row beamRow =
+ Row.withSchema(ROW_SCHEMA)
+ .withFieldValue("key", bigtableRow.getKey().toByteArray())
+ .withFieldValue("column_families", families)
+ .build();
+ out.output(beamRow);
+ } else {
+ // Flattening logic (new behavior): one output row per column
qualifier.
+ byte[] key = bigtableRow.getKey().toByteArray();
+ for (Family fam : bigtableRow.getFamiliesList()) {
+ String familyName = fam.getName();
+ for (Column col : fam.getColumnsList()) {
+ ByteString qualifierName = col.getQualifier();
+ List<Row> cells = new ArrayList<>();
+ for (Cell cell : col.getCellsList()) {
+ Row cellRow =
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", cell.getValue().toByteArray())
+ .withFieldValue("timestamp_micros",
cell.getTimestampMicros())
+ .build();
+ cells.add(cellRow);
+ }
Review Comment:

There is duplicated logic for converting Bigtable cells to Beam Rows. The
`else` block for flattened output re-implements the logic already present in
the `convertCells` helper method. To improve maintainability and reduce code
duplication, you should call the existing `convertCells` method here.
```suggestion
List<Row> cells = convertCells(col.getCellsList());
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,194 @@ public void tearDown() {
tableAdminClient.close();
}
- public List<Row> writeToTable(int numRows) {
+ @Test
+ public void testRead() {
+ int numRows = 20;
List<Row> expectedRows = new ArrayList<>();
+ for (int i = 1; i <= numRows; i++) {
+ String key = "key" + i;
+ byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ String valueA = "value a" + i;
+ byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+ String valueB = "value b" + i;
+ byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+ String valueC = "value c" + i;
+ byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+ String valueD = "value d" + i;
+ byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+ long timestamp = 1000L * i;
- try {
- for (int i = 1; i <= numRows; i++) {
- String key = "key" + i;
- String valueA = "value a" + i;
- String valueB = "value b" + i;
- String valueC = "value c" + i;
- String valueD = "value d" + i;
- long timestamp = 1000L * i;
-
- RowMutation rowMutation =
- RowMutation.create(tableId, key)
- .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
- .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
- .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
- .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
- dataClient.mutateRow(rowMutation);
-
- // Set up expected Beam Row
- Map<String, List<Row>> columns1 = new HashMap<>();
- columns1.put(
- "a",
- Arrays.asList(
- Row.withSchema(CELL_SCHEMA)
- .withFieldValue(
- "value",
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
- .withFieldValue("timestamp_micros", timestamp)
- .build()));
- columns1.put(
- "b",
- Arrays.asList(
- Row.withSchema(CELL_SCHEMA)
- .withFieldValue(
- "value",
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
- .withFieldValue("timestamp_micros", timestamp)
- .build()));
-
- Map<String, List<Row>> columns2 = new HashMap<>();
- columns2.put(
- "c",
- Arrays.asList(
- Row.withSchema(CELL_SCHEMA)
- .withFieldValue(
- "value",
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
- .withFieldValue("timestamp_micros", timestamp)
- .build()));
- columns2.put(
- "d",
- Arrays.asList(
- Row.withSchema(CELL_SCHEMA)
- .withFieldValue(
- "value",
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
- .withFieldValue("timestamp_micros", timestamp)
- .build()));
-
- Map<String, Map<String, List<Row>>> families = new HashMap<>();
- families.put(COLUMN_FAMILY_NAME_1, columns1);
- families.put(COLUMN_FAMILY_NAME_2, columns2);
-
- Row expectedRow =
- Row.withSchema(ROW_SCHEMA)
- .withFieldValue("key",
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
- .withFieldValue("column_families", families)
- .build();
-
- expectedRows.add(expectedRow);
- }
- LOG.info("Finished writing {} rows to table {}", numRows, tableId);
- } catch (NotFoundException e) {
- throw new RuntimeException("Failed to write to table", e);
+ RowMutation rowMutation =
+ RowMutation.create(tableId, key)
+ .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+ .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+ .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+ .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+ dataClient.mutateRow(rowMutation);
+
+ // Set up expected Beam Row
+ Map<String, List<Row>> columns1 = new HashMap<>();
+ columns1.put(
+ "a",
+ Arrays.asList(
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", valueABytes)
+ .withFieldValue("timestamp_micros", timestamp)
+ .build()));
+ columns1.put(
+ "b",
+ Arrays.asList(
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", valueBBytes)
+ .withFieldValue("timestamp_micros", timestamp)
+ .build()));
+
+ Map<String, List<Row>> columns2 = new HashMap<>();
+ columns2.put(
+ "c",
+ Arrays.asList(
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", valueCBytes)
+ .withFieldValue("timestamp_micros", timestamp)
+ .build()));
+ columns2.put(
+ "d",
+ Arrays.asList(
+ Row.withSchema(CELL_SCHEMA)
+ .withFieldValue("value", valueDBytes)
+ .withFieldValue("timestamp_micros", timestamp)
+ .build()));
+
+ Map<String, Map<String, List<Row>>> families = new HashMap<>();
+ families.put(COLUMN_FAMILY_NAME_1, columns1);
+ families.put(COLUMN_FAMILY_NAME_2, columns2);
+
+ Row expectedRow =
+ Row.withSchema(ROW_SCHEMA)
+ .withFieldValue("key", keyBytes)
+ .withFieldValue("column_families", families)
+ .build();
+
+ expectedRows.add(expectedRow);
}
- return expectedRows;
+ LOG.info("Finished writing {} rows to table {}", numRows, tableId);
+
+ BigtableReadSchemaTransformConfiguration config =
+ BigtableReadSchemaTransformConfiguration.builder()
+ .setTableId(tableId)
+ .setInstanceId(instanceId)
+ .setProjectId(projectId)
+ .setFlatten(false)
+ .build();
+
+ SchemaTransform transform = new
BigtableReadSchemaTransformProvider().from(config);
+
+ PCollection<Row> rows =
PCollectionRowTuple.empty(p).apply(transform).get("output");
+
+ PAssert.that(rows).containsInAnyOrder(expectedRows);
+ p.run().waitUntilFinish();
Review Comment:

The test setup logic, specifically writing data to Bigtable, is duplicated
between `testRead()` and `testReadFlatten()`. To improve code reuse and make
the tests cleaner, consider extracting the data writing part into a separate
helper method. This method could take `numRows` as a parameter and be called
from both test methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]