mutianf commented on code in PR #35696:
URL: https://github.com/apache/beam/pull/35696#discussion_r2248089716


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider
                   Schema.FieldType.STRING,
                   Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
           .build();
+  public static final Schema FLATTENED_ROW_SCHEMA =
+      Schema.builder()
+          .addByteArrayField("key")
+          .addStringField("family_name")

Review Comment:
   I don't know if this matters but can we make the field name consistent? In 
row_schema this is `column_families`



##########
sdks/python/apache_beam/io/gcp/bigtableio.py:
##########
@@ -357,7 +357,8 @@ def expand(self, input):
         rearrange_based_on_discovery=True,
         table_id=self._table_id,
         instance_id=self._instance_id,
-        project_id=self._project_id)
+        project_id=self._project_id,
+        flatten=False)

Review Comment:
   should we make it consistent with java? I prefer false but i don't feel 
strongly about it.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider
                   Schema.FieldType.STRING,

Review Comment:
   can we update qualifier to be ByteArray? (if there isn't a byteString type)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                       .withInstanceId(configuration.getInstanceId())
                       .withProjectId(configuration.getProjectId()));
 
+      Schema outputSchema =
+          Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : 
FLATTENED_ROW_SCHEMA;
+
       PCollection<Row> beamRows =
-          bigtableRows.apply(MapElements.via(new 
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+          bigtableRows
+              .apply("ConvertToBeamRows", ParDo.of(new 
BigtableRowConverterDoFn(configuration)))
+              .setRowSchema(outputSchema);
 
       return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
     }
   }
 
-  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row, Row> {
-    @Override
-    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
-      // The collection of families is represented as a Map of column families.
-      // Each column family is represented as a Map of columns.
-      // Each column is represented as a List of cells
-      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
-      Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
-      for (Family fam : bigtableRow.getFamiliesList()) {
-        // Map of column qualifier to list of cells
-        Map<String, List<Row>> columns = new HashMap<>();
-        for (Column col : fam.getColumnsList()) {
-          List<Row> cells = new ArrayList<>();
-          for (Cell cell : col.getCellsList()) {
-            Row cellRow =
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
-                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  /**
+   * A {@link DoFn} that converts a Bigtable {@link 
com.google.bigtable.v2.Row} to a Beam {@link
+   * Row}. It supports both a nested representation and a flattened 
representation where each column
+   * becomes a separate output element.
+   */
+  private static class BigtableRowConverterDoFn extends 
DoFn<com.google.bigtable.v2.Row, Row> {
+    private final BigtableReadSchemaTransformConfiguration configuration;
+
+    BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    private List<Row> convertCells(List<Cell> bigtableCells) {
+      List<Row> beamCells = new ArrayList<>();
+      for (Cell cell : bigtableCells) {
+        Row cellRow =
+            Row.withSchema(CELL_SCHEMA)
+                .withFieldValue("value", cell.getValue().toByteArray())
+                .withFieldValue("timestamp_micros", cell.getTimestampMicros())
+                .build();
+        beamCells.add(cellRow);
+      }
+      return beamCells;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> 
out) {
+      // The builder defaults flatten to true. We check for an explicit false 
setting to disable it.
+
+      if (Boolean.FALSE.equals(configuration.getFlatten())) {
+        // Non-flattening logic (original behavior): one output row per 
Bigtable row.
+        Map<String, Map<String, List<Row>>> families = new HashMap<>();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          Map<String, List<Row>> columns = new HashMap<>();
+          for (Column col : fam.getColumnsList()) {
+
+            List<Cell> bigTableCells = col.getCellsList();
+
+            List<Row> cells = convertCells(bigTableCells);
+
+            columns.put(col.getQualifier().toStringUtf8(), cells);
+          }
+          families.put(fam.getName(), columns);
+        }
+        Row beamRow =
+            Row.withSchema(ROW_SCHEMA)
+                .withFieldValue("key", bigtableRow.getKey().toByteArray())
+                .withFieldValue("column_families", families)
+                .build();
+        out.output(beamRow);
+      } else {
+        // Flattening logic (new behavior): one output row per column 
qualifier.
+        byte[] key = bigtableRow.getKey().toByteArray();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          String familyName = fam.getName();
+          for (Column col : fam.getColumnsList()) {
+            ByteString qualifierName = col.getQualifier();
+            List<Row> cells = new ArrayList<>();
+            for (Cell cell : col.getCellsList()) {

Review Comment:
   I think you can call convertCells() here? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -109,6 +119,8 @@ public static Builder builder() {
 
     public abstract String getProjectId();
 
+    public abstract @Nullable Boolean getFlatten();

Review Comment:
   is this Nullable? what does null value mean? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java:
##########
@@ -189,7 +189,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                 + "\"type\": String\n"
                 + "\"value\": ByteString\n"
                 + "\"column_qualifier\": ByteString\n"

Review Comment:
   The type for qualifier is bytearray but here it's bytestring, is this 
accurate? Same for key and value 



##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -375,23 +375,24 @@
 #BigTable
 - type: renaming
   transforms:
-    #'ReadFromBigTable': 'ReadFromBigTable'
+    'ReadFromBigTable': 'ReadFromBigTable'
     'WriteToBigTable': 'WriteToBigTable'
   config:
     mappings:
       #Temp removing read from bigTable IO
-#      'ReadFromBigTable':
-#        project: 'project_id'
-#        instance: 'instance_id'
-#        table: 'table_id'
+      'ReadFromBigTable':
+        project: 'project_id'
+        instance: 'instance_id'
+        table: 'table_id'
+        flatten: "flatten"

Review Comment:
   should this be true/false? 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,194 @@ public void tearDown() {
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test
+  public void testRead() {
+    int numRows = 20;
     List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
-    try {
-      for (int i = 1; i <= numRows; i++) {
-        String key = "key" + i;
-        String valueA = "value a" + i;
-        String valueB = "value b" + i;
-        String valueC = "value c" + i;
-        String valueD = "value d" + i;
-        long timestamp = 1000L * i;
-
-        RowMutation rowMutation =
-            RowMutation.create(tableId, key)
-                .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
-                .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
-                .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
-                .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
-        dataClient.mutateRow(rowMutation);
-
-        // Set up expected Beam Row
-        Map<String, List<Row>> columns1 = new HashMap<>();
-        columns1.put(
-            "a",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns1.put(
-            "b",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, List<Row>> columns2 = new HashMap<>();
-        columns2.put(
-            "c",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns2.put(
-            "d",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, Map<String, List<Row>>> families = new HashMap<>();
-        families.put(COLUMN_FAMILY_NAME_1, columns1);
-        families.put(COLUMN_FAMILY_NAME_2, columns2);
-
-        Row expectedRow =
-            Row.withSchema(ROW_SCHEMA)
-                .withFieldValue("key", 
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
-                .withFieldValue("column_families", families)
-                .build();
-
-        expectedRows.add(expectedRow);
-      }
-      LOG.info("Finished writing {} rows to table {}", numRows, tableId);
-    } catch (NotFoundException e) {
-      throw new RuntimeException("Failed to write to table", e);
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);

Review Comment:
   can we add a setCell
   
   `setCell(COLUMN_FAMILY_NAME_2, "d", newTimestamp, newValueD);`? 
   
   in this test and the flattened test?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider
                   Schema.FieldType.STRING,
                   Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
           .build();
+  public static final Schema FLATTENED_ROW_SCHEMA =
+      Schema.builder()
+          .addByteArrayField("key")
+          .addStringField("family_name")

Review Comment:
   Maybe we should also extract field name to a global variable because it's 
also used when we're converting the row? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to