gemini-code-assist[bot] commented on code in PR #35794:
URL: https://github.com/apache/beam/pull/35794#discussion_r2258046488


##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -375,23 +375,47 @@
 #BigTable
 - type: renaming
   transforms:
-    #'ReadFromBigTable': 'ReadFromBigTable'
+    'ReadFromBigTable': 'ReadFromBigTable'
     'WriteToBigTable': 'WriteToBigTable'
   config:
     mappings:
       #Temp removing read from bigTable IO
-#      'ReadFromBigTable':
-#        project: 'project_id'
-#        instance: 'instance_id'
-#        table: 'table_id'
+      'ReadFromBigTable':
+        project: 'project_id'
+        instance: 'instance_id'
+        table: 'table_id'
+        flatten: "flatten"
       'WriteToBigTable':
         project: 'project_id'
         instance: 'instance_id'
         table: 'table_id'
     underlying_provider:
       type: beamJar
       transforms:
-#        'ReadFromBigTable': 
'beam:schematransform:org.apache.beam:bigtable_read:v1'
+        'ReadFromBigTable': 
'beam:schematransform:org.apache.beam:bigtable_read:v1'
         'WriteToBigTable': 
'beam:schematransform:org.apache.beam:bigtable_write:v1'
       config:
         gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+
+#MongoDB
+- type: renaming
+  transforms:
+    'ReadFromMongoDB': 'ReadFromMongoDB'
+    'WriteToMongoDB': 'WriteToMongoDB'
+  config:
+    mappings:
+      'ReadFromMongoDB':
+        connection_uri: "connection_uri"
+        database: "database"
+        collection: "collection"
+      'WriteToMongoDB':
+        connection_uri: "connection_uri"
+        database: "database"
+        collection: "collection"
+    underlying_provider:
+      type: beamJar
+      transforms:
+        'ReadFromBigTable': 
'beam:schematransform:org.apache.beam:mongodb_read:v1'
+        'WriteToBigTable': 
'beam:schematransform:org.apache.beam:mongodb_write:v1'

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   There seems to be a copy-paste error here. The transform keys for MongoDB 
are incorrectly named `ReadFromBigTable` and `WriteToBigTable`. They should be 
`ReadFromMongoDB` and `WriteToMongoDB` to match the transform names defined in 
the `transforms` section.
   
   ```
           'ReadFromMongoDB': 
'beam:schematransform:org.apache.beam:mongodb_read:v1'
           'WriteToMongoDB': 
'beam:schematransform:org.apache.beam:mongodb_write:v1'
   ```



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for writing to 
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {
+
+  private static final String INPUT_TAG = "input";
+
+  @Override
+  protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration 
configuration) {
+    return new MongoDbWriteSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mongodb_write:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /** Configuration class for the MongoDB Write transform. */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class MongoDbWriteSchemaTransformConfiguration 
implements Serializable {
+
+    @SchemaFieldDescription("The connection URI for the MongoDB server.")
+    public abstract String getUri();
+
+    @SchemaFieldDescription("The MongoDB database to write to.")
+    public abstract String getDatabase();
+
+    @SchemaFieldDescription("The MongoDB collection to write to.")
+    public abstract String getCollection();
+
+    //    @SchemaFieldDescription("The number of documents to include in each 
batch write.")
+    //    @Nullable
+    //    public abstract Long getBatchSize();
+    //
+    //    @SchemaFieldDescription("Whether the writes should be performed in 
an ordered manner.")
+    //    @Nullable
+    //    public abstract Boolean getOrdered();
+
+    public void validate() {
+      checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must 
be specified.");
+      checkArgument(
+          getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database 
must be specified.");
+      checkArgument(
+          getCollection() != null && !getCollection().isEmpty(),
+          "MongoDB collection must be specified.");
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
+          .Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setUri(String uri);
+
+      public abstract Builder setDatabase(String database);
+
+      public abstract Builder setCollection(String collection);
+
+      public abstract Builder setBatchSize(Long batchSize);
+
+      public abstract Builder setOrdered(Boolean ordered);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The builder includes `setBatchSize` and `setOrdered` methods, but the 
corresponding getters in `MongoDbWriteSchemaTransformConfiguration` and their 
usage in `MongoDbWriteSchemaTransform` are commented out. This is inconsistent. 
If these features are not yet supported, these methods should be removed from 
the builder to avoid confusion.



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for reading from 
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> {
+
+  private static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<MongoDbReadSchemaTransformConfiguration> 
configurationClass() {
+    return MongoDbReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration 
configuration) {
+    return new MongoDbReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    // Return a unique URN for the transform.
+    return "beam:schematransform:org.apache.beam:mongodb_read:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    // A read transform does not have an input PCollection.
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    // The primary output is a PCollection of Rows.
+    // Error handling could be added later with a second "errors" output tag.
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /** Configuration class for the MongoDB Read transform. */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class MongoDbReadSchemaTransformConfiguration 
implements Serializable {
+
+    @SchemaFieldDescription("The connection URI for the MongoDB server.")
+    public abstract String getUri();
+
+    @SchemaFieldDescription("The MongoDB database to read from.")
+    public abstract String getDatabase();
+
+    @SchemaFieldDescription("The MongoDB collection to read from.")
+    public abstract String getCollection();
+
+    @SchemaFieldDescription(
+        "An optional BSON filter to apply to the read. This should be a valid 
JSON string.")
+    @Nullable
+    public abstract String getFilter();
+
+    public void validate() {
+      checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must 
be specified.");
+      checkArgument(
+          getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database 
must be specified.");
+      checkArgument(
+          getCollection() != null && !getCollection().isEmpty(),
+          "MongoDB collection must be specified.");
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration
+          .Builder();
+    }
+
+    /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setUri(String uri);
+
+      public abstract Builder setDatabase(String database);
+
+      public abstract Builder setCollection(String collection);
+
+      public abstract Builder setFilter(String filter);
+
+      public abstract MongoDbReadSchemaTransformConfiguration build();
+    }
+  }
+
+  /** The {@link SchemaTransform} that performs the read operation. */
+  private static class MongoDbReadSchemaTransform extends SchemaTransform {
+    private final MongoDbReadSchemaTransformConfiguration configuration;
+
+    MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      // A read transform does not have an input, so we start with the 
pipeline.
+      PCollection<Document> mongoDocs =
+          input
+              .getPipeline()
+              .apply(
+                  "ReadFromMongoDb",
+                  MongoDbIO.read()
+                      .withUri(configuration.getUri())
+                      .withDatabase(configuration.getDatabase())
+                      .withCollection(configuration.getCollection()));
+      // TODO: Add support for .withFilter() if it exists in your MongoDbIO,
+      // using configuration.getFilter().
+
+      // Convert the BSON Document objects into Beam Row objects.
+      PCollection<Row> beamRows =
+          mongoDocs.apply("ConvertToBeamRows", ParDo.of(new 
MongoDocumentToRowFn()));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
+    }
+  }
+
+  /**
+   * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link 
Row}.
+   *
+   * <p>This is a critical step to ensure data is in a schema-aware format.
+   */
+  private static class MongoDocumentToRowFn extends DoFn<Document, Row> {
+    // TODO: Define the Beam Schema that corresponds to your MongoDB documents.
+    // This could be made dynamic based on an inferred schema or a 
user-provided schema.
+    // For this skeleton, we assume a static schema.
+    // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build();
+
+    @ProcessElement
+    public void processElement(@Element Document doc, OutputReceiver<Row> out) 
{
+      // Here you will convert the BSON document to a Beam Row.
+      // This requires you to know the target schema.
+
+      // Example pseudo-code:
+      // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA);
+      // for (Map.Entry<String, Object> entry : doc.entrySet()) {
+      //   rowBuilder.addValue(entry.getValue());
+      // }
+      // out.output(rowBuilder.build());
+
+      // For a robust implementation, you would handle data type conversions
+      // between BSON types and Beam schema types.
+      throw new UnsupportedOperationException(
+          "MongoDocumentToRowFn must be implemented to convert MongoDB 
Documents to Beam Rows.");
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `MongoDocumentToRowFn` is not implemented and throws an 
`UnsupportedOperationException`. While this is a draft PR, this makes the 
transform unusable. This `DoFn` is critical for converting MongoDB documents to 
Beam Rows and needs to be implemented.



##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -675,11 +676,15 @@ def __hash__(self):
     return hash(self.__dict__.items())
 
   def __eq__(self, other):
+    if type(self) == type(other):
+      other_dict = other.__dict__
+    elif type(other) == type(NamedTuple):
+      other_dict = other._asdict()

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The check `type(other) == type(NamedTuple)` is incorrect for identifying an 
instance of a `NamedTuple`. `NamedTuple` is a factory function, not a class 
type that instances will have. An instance of a `NamedTuple` is a subclass of 
`tuple`. A better way to check if `other` is a `NamedTuple`-like object is to 
check if it's a tuple and has the `_asdict` method.
   
   ```suggestion
       elif isinstance(other, tuple) and hasattr(other, '_asdict'):
         other_dict = other._asdict()
   ```



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for writing to 
MongoDB.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {
+
+  private static final String INPUT_TAG = "input";
+
+  @Override
+  protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration 
configuration) {
+    return new MongoDbWriteSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mongodb_write:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /** Configuration class for the MongoDB Write transform. */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class MongoDbWriteSchemaTransformConfiguration 
implements Serializable {
+
+    @SchemaFieldDescription("The connection URI for the MongoDB server.")
+    public abstract String getUri();
+
+    @SchemaFieldDescription("The MongoDB database to write to.")
+    public abstract String getDatabase();
+
+    @SchemaFieldDescription("The MongoDB collection to write to.")
+    public abstract String getCollection();
+
+    //    @SchemaFieldDescription("The number of documents to include in each 
batch write.")
+    //    @Nullable
+    //    public abstract Long getBatchSize();
+    //
+    //    @SchemaFieldDescription("Whether the writes should be performed in 
an ordered manner.")
+    //    @Nullable
+    //    public abstract Boolean getOrdered();
+
+    public void validate() {
+      checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must 
be specified.");
+      checkArgument(
+          getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database 
must be specified.");
+      checkArgument(
+          getCollection() != null && !getCollection().isEmpty(),
+          "MongoDB collection must be specified.");
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
+          .Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setUri(String uri);
+
+      public abstract Builder setDatabase(String database);
+
+      public abstract Builder setCollection(String collection);
+
+      public abstract Builder setBatchSize(Long batchSize);
+
+      public abstract Builder setOrdered(Boolean ordered);
+
+      public abstract MongoDbWriteSchemaTransformConfiguration build();
+    }
+  }
+
+  /** The {@link SchemaTransform} that performs the write operation. */
+  private static class MongoDbWriteSchemaTransform extends SchemaTransform {
+    private final MongoDbWriteSchemaTransformConfiguration configuration;
+
+    MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> rows = input.get(INPUT_TAG);
+
+      PCollection<Document> documents =
+          rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn()));
+
+      MongoDbIO.Write write =
+          MongoDbIO.write()
+              .withUri(configuration.getUri())
+              .withDatabase(configuration.getDatabase())
+              .withCollection(configuration.getCollection());
+
+      //      if (configuration.getBatchSize() != null) {
+      //        write = write.withBatchSize(configuration.getBatchSize());
+      //      }
+      //      if (configuration.getOrdered() != null) {
+      //        write = write.withOrdered(configuration.getOrdered());
+      //      }
+
+      documents.apply("WriteToMongo", write);
+
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+  }
+
+  /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link 
Document}. */
+  private static class RowToMongoDocumentFn extends DoFn<Row, Document> {
+    @ProcessElement
+    public void processElement(@Element Row row, OutputReceiver<Document> out) 
{
+      Document doc = new Document();
+      for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
+        String fieldName = row.getSchema().getField(i).getName();
+        Object value = row.getValue(i);
+
+        if (value != null) {
+          doc.append(fieldName, value);
+        }
+      }
+      out.output(doc);
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `RowToMongoDocumentFn` class appears to be unused. The `expand` method 
uses `RowToBsonDocumentFn` for the conversion. This class should be removed to 
avoid dead code.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                       .withInstanceId(configuration.getInstanceId())
                       .withProjectId(configuration.getProjectId()));
 
+      Schema outputSchema =
+          Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : 
FLATTENED_ROW_SCHEMA;
+
       PCollection<Row> beamRows =
-          bigtableRows.apply(MapElements.via(new 
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+          bigtableRows
+              .apply("ConvertToBeamRows", ParDo.of(new 
BigtableRowConverterDoFn(configuration)))
+              .setRowSchema(outputSchema);
 
       return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
     }
   }
 
-  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row, Row> {
-    @Override
-    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
-      // The collection of families is represented as a Map of column families.
-      // Each column family is represented as a Map of columns.
-      // Each column is represented as a List of cells
-      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
-      Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
-      for (Family fam : bigtableRow.getFamiliesList()) {
-        // Map of column qualifier to list of cells
-        Map<String, List<Row>> columns = new HashMap<>();
-        for (Column col : fam.getColumnsList()) {
-          List<Row> cells = new ArrayList<>();
-          for (Cell cell : col.getCellsList()) {
-            Row cellRow =
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
-                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  /**
+   * A {@link DoFn} that converts a Bigtable {@link 
com.google.bigtable.v2.Row} to a Beam {@link
+   * Row}. It supports both a nested representation and a flattened 
representation where each column
+   * becomes a separate output element.
+   */
+  private static class BigtableRowConverterDoFn extends 
DoFn<com.google.bigtable.v2.Row, Row> {
+    private final BigtableReadSchemaTransformConfiguration configuration;
+
+    BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    private List<Row> convertCells(List<Cell> bigtableCells) {
+      List<Row> beamCells = new ArrayList<>();
+      for (Cell cell : bigtableCells) {
+        Row cellRow =
+            Row.withSchema(CELL_SCHEMA)
+                .withFieldValue("value", cell.getValue().toByteArray())
+                .withFieldValue("timestamp_micros", cell.getTimestampMicros())
+                .build();
+        beamCells.add(cellRow);
+      }
+      return beamCells;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> 
out) {
+      // The builder defaults flatten to true. We check for an explicit false 
setting to disable it.
+
+      if (Boolean.FALSE.equals(configuration.getFlatten())) {
+        // Non-flattening logic (original behavior): one output row per 
Bigtable row.
+        Map<String, Map<ByteString, List<Row>>> families = new HashMap<>();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          Map<ByteString, List<Row>> columns = new HashMap<>();
+          for (Column col : fam.getColumnsList()) {
+
+            List<Cell> bigTableCells = col.getCellsList();
+
+            List<Row> cells = convertCells(bigTableCells);
+
+            columns.put(col.getQualifier(), cells);
+          }
+          families.put(fam.getName(), columns);
+        }
+        Row beamRow =
+            Row.withSchema(ROW_SCHEMA)
+                .withFieldValue("key", bigtableRow.getKey().toByteArray())
+                .withFieldValue("column_families", families)
+                .build();
+        out.output(beamRow);
+      } else {
+        // Flattening logic (new behavior): one output row per column 
qualifier.
+        byte[] key = bigtableRow.getKey().toByteArray();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          String familyName = fam.getName();
+          for (Column col : fam.getColumnsList()) {
+            ByteString qualifierName = col.getQualifier();
+            List<Row> cells = new ArrayList<>();
+            for (Cell cell : col.getCellsList()) {
+              Row cellRow =
+                  Row.withSchema(CELL_SCHEMA)
+                      .withFieldValue("value", cell.getValue().toByteArray())
+                      .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+                      .build();
+              cells.add(cellRow);
+            }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There is duplicated logic for converting Bigtable cells to Beam Rows. The 
`else` block for flattened output re-implements the logic already present in 
the `convertCells` helper method. To improve maintainability and reduce code 
duplication, you should call the existing `convertCells` method here.
   
   ```suggestion
               List<Row> cells = convertCells(col.getCellsList());
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,194 @@ public void tearDown() {
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test
+  public void testRead() {
+    int numRows = 20;
     List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
-    try {
-      for (int i = 1; i <= numRows; i++) {
-        String key = "key" + i;
-        String valueA = "value a" + i;
-        String valueB = "value b" + i;
-        String valueC = "value c" + i;
-        String valueD = "value d" + i;
-        long timestamp = 1000L * i;
-
-        RowMutation rowMutation =
-            RowMutation.create(tableId, key)
-                .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
-                .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
-                .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
-                .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
-        dataClient.mutateRow(rowMutation);
-
-        // Set up expected Beam Row
-        Map<String, List<Row>> columns1 = new HashMap<>();
-        columns1.put(
-            "a",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns1.put(
-            "b",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, List<Row>> columns2 = new HashMap<>();
-        columns2.put(
-            "c",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns2.put(
-            "d",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, Map<String, List<Row>>> families = new HashMap<>();
-        families.put(COLUMN_FAMILY_NAME_1, columns1);
-        families.put(COLUMN_FAMILY_NAME_2, columns2);
-
-        Row expectedRow =
-            Row.withSchema(ROW_SCHEMA)
-                .withFieldValue("key", 
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
-                .withFieldValue("column_families", families)
-                .build();
-
-        expectedRows.add(expectedRow);
-      }
-      LOG.info("Finished writing {} rows to table {}", numRows, tableId);
-    } catch (NotFoundException e) {
-      throw new RuntimeException("Failed to write to table", e);
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+      dataClient.mutateRow(rowMutation);
+
+      // Set up expected Beam Row
+      Map<String, List<Row>> columns1 = new HashMap<>();
+      columns1.put(
+          "a",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueABytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns1.put(
+          "b",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueBBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, List<Row>> columns2 = new HashMap<>();
+      columns2.put(
+          "c",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueCBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns2.put(
+          "d",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueDBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+      families.put(COLUMN_FAMILY_NAME_1, columns1);
+      families.put(COLUMN_FAMILY_NAME_2, columns2);
+
+      Row expectedRow =
+          Row.withSchema(ROW_SCHEMA)
+              .withFieldValue("key", keyBytes)
+              .withFieldValue("column_families", families)
+              .build();
+
+      expectedRows.add(expectedRow);
     }
-    return expectedRows;
+    LOG.info("Finished writing {} rows to table {}", numRows, tableId);
+
+    BigtableReadSchemaTransformConfiguration config =
+        BigtableReadSchemaTransformConfiguration.builder()
+            .setTableId(tableId)
+            .setInstanceId(instanceId)
+            .setProjectId(projectId)
+            .setFlatten(false)
+            .build();
+
+    SchemaTransform transform = new 
BigtableReadSchemaTransformProvider().from(config);
+
+    PCollection<Row> rows = 
PCollectionRowTuple.empty(p).apply(transform).get("output");
+
+    PAssert.that(rows).containsInAnyOrder(expectedRows);
+    p.run().waitUntilFinish();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The test setup logic, specifically writing data to Bigtable, is duplicated 
between `testRead()` and `testReadFlatten()`. To improve code reuse and make 
the tests cleaner, consider extracting the data writing part into a separate 
helper method. This method could take `numRows` as a parameter and be called 
from both test methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to