ahmedabu98 commented on code in PR #32529:
URL: https://github.com/apache/beam/pull/32529#discussion_r1779159586


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -580,9 +583,18 @@ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
                       .build()),
           "When writing using CDC functionality, we expect a \"cdc_info\" 
field of Row type "
               + "with fields \"mutation_type\" and \"change_sequence_number\" 
of type string.");
+
+      RowDynamicDestinations destinations =
+          new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema())
+              .withPrimaryKey(configuration.getCdcWritesPrimaryKey());
+      if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
+        destinations = 
destinations.withFixedDestination(configuration.getTable());
+      }

Review Comment:
   can we remove this duplicated code? (see previous comment) 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -556,14 +548,25 @@ BigQueryIO.Write<Row> 
createStorageWriteApiTransform(Schema schema) {
       return write;
     }
 
-    boolean isCdcConfigured() {
-      return 
!Optional.ofNullable(configuration.getUseCdcWritesWithPrimaryKey())
-          .orElse(ImmutableList.of())
-          .isEmpty();
-    }
-
     BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
         BigQueryIO.Write<Row> write, Schema schema) {
+      if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+        checkArgument(
+            
BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS
+                    .get(configuration.getCreateDisposition().toUpperCase())
+                    .equals(CreateDisposition.CREATE_IF_NEEDED)
+                && !Optional.ofNullable(configuration.getCdcWritesPrimaryKey())
+                    .orElse(ImmutableList.of())
+                    .isEmpty(),
+            "When using CDC writes into BigQuery, alongside with 
CREATE_IF_NEEDED mode,"
+                + " a primary key should be provided.");
+      }
+      if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
+        checkArgument(
+            schema.getFieldNames().contains("destination"),
+            "When writing to dynamic destinations, we expect Row Schema with a 
"
+                + "\"destination\" string field.");
+      }

Review Comment:
   Let's lean on the existing checks -- I don't think we need to create new 
ones. The first check is already covered in 
[BigQueryIO](https://github.com/apache/beam/blob/271ea4311564bd03d5d6b1881895a1c466541d65/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L3462-L3464),
 and the second check is covered 
[above](https://github.com/apache/beam/blob/271ea4311564bd03d5d6b1881895a1c466541d65/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java#L469-L473)
 in this file 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -298,7 +300,8 @@ public void testCDCWriteToDynamicDestinations() throws 
Exception {
     BigQueryStorageWriteApiSchemaTransformConfiguration config =
         BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
             .setTable(dynamic)
-            .setUseCdcWritesWithPrimaryKey(primaryKeyColumns)
+            .setUseCdcWrites(true)
+            .setCdcWritesPrimaryKey(primaryKeyColumns)
             .build();

Review Comment:
   I wonder why this is possible without explicitly setting "at-least-once" mode



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2613,6 +2625,62 @@ def expand(self, input):
       # communicate to Java that this write should use dynamic destinations
       table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS
 
+    use_cdc_writes = False
+    # if CDC functionality is configured we need to check if a callable has
+    # been passed to extract MutationInfo from the rows to be written
+    if callable(self._use_cdc_writes):

Review Comment:
   I'm realizing that this callable CDC writes option is more complex than I 
thought. For it to be complete and not confusing for users, we will have to 
provide this logic for both Python dict inputs and Beam Row inputs
   
   I suggest we keep the CDC option to a boolean and make this improvement in a 
future PR



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -580,9 +583,18 @@ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
                       .build()),
           "When writing using CDC functionality, we expect a \"cdc_info\" 
field of Row type "
               + "with fields \"mutation_type\" and \"change_sequence_number\" 
of type string.");
+
+      RowDynamicDestinations destinations =
+          new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema())
+              .withPrimaryKey(configuration.getCdcWritesPrimaryKey());
+      if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
+        destinations = 
destinations.withFixedDestination(configuration.getTable());
+      }
+
       return write
-          .withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
-          .withPrimaryKey(configuration.getUseCdcWritesWithPrimaryKey())
+          .to(destinations)
+          .withFormatFunction(row -> 
BigQueryUtils.toTableRow(row.getRow("record")))

Review Comment:
   also a duplicated line that we should remove



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -510,30 +518,14 @@ BigQueryIO.Write<Row> 
createStorageWriteApiTransform(Schema schema) {
             schema.getFieldNames().containsAll(Arrays.asList("destination", 
"record")),
             "When writing to dynamic destinations, we expect Row Schema with a 
"
                 + "\"destination\" string field and a \"record\" Row field.");
-        RowDynamicDestinations dynamicDestination =
-            new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema());
-
-        if (isCdcConfigured()) {
-          dynamicDestination =
-              
dynamicDestination.withPrimaryKey(configuration.getUseCdcWritesWithPrimaryKey());
-          write = validateAndIncludeCDCInformation(write, schema);
-        }
         write =
             write
-                .to(dynamicDestination)
+                .to(new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))

Review Comment:
   Can we just instantiate `new RowDynamicDestinations(<row schema>, <primary 
key>)` here? And avoid instantiating it again in 
validateAndIncludeCDCInformation?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2087,12 +2089,15 @@ def __init__(
         GCP expansion service. Used for STORAGE_WRITE_API method.
       max_insert_payload_size: The maximum byte size for a BigQuery legacy
         streaming insert payload.
-      cdc_writes_with_primary_key: Configure the usage of CDC writes on 
BigQuery
-        and sets the primary key using the column names from the argument.
-        The destination table (or tables if using dynamic destionations) will
-        be created in case of using CREATE_IF_NEEDED mode, and StorageWrite API
-        at least once mode will be configured. Used for STORAGE_WRITE_API
-        method.
+      use_cdc_writes: Configure the usage of CDC writes on BigQuery.
+        The argument can be used by passing True and the Beam Rows will be
+        sent as they are to the BigQuery sink which expects a 'record'
+        and 'cdc_info' properties.
+        Used for STORAGE_WRITE_API, working on 'at least once' mode.

Review Comment:
   "If True, your input elements are expected to have a 'record' field 
representing the record to write, and a 'cdc_info: {mutation_type: <str>, 
change_sequence_number: <str>}' field representing the mutation information."



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -263,11 +262,18 @@ public static Builder builder() {
     public abstract ErrorHandling getErrorHandling();
 
     @SchemaFieldDescription(
-        "This option enables the use of BigQuery CDC functionality. It expects 
a Row schema"
-            + " wrapping the record to be inserted and adding the CDC info 
similar to:"
-            + " {cdc_info: {mutation_type:\"...\", 
change_sequence_number:\"...\"}, record: {...}}")
+        "This option enables the use of BigQuery CDC functionality. The 
expected PCollection"
+            + " should contain Beam Rows with a schema wrapping the record to 
be inserted and"
+            + " adding the CDC info similar to: {cdc_info: 
{mutation_type:\"...\", "
+            + "change_sequence_number:\"...\"}, record: {...}}")
     @Nullable
-    public abstract List<String> getUseCdcWritesWithPrimaryKey();
+    public abstract Boolean getUseCdcWrites();
+
+    @SchemaFieldDescription(
+        "In the case of using CDC writes and setting CREATE_IF_NEEDED mode for 
the tables"
+            + " a primary key is required.")
+    @Nullable
+    public abstract List<String> getCdcWritesPrimaryKey();

Review Comment:
   nit:
   ```suggestion
           "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be 
created with this primary key. "
               + "Required when CDC writes are enabled with CREATE_IF_NEEDED.")
       @Nullable
       public abstract List<String> getPrimaryKey();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -510,30 +518,14 @@ BigQueryIO.Write<Row> 
createStorageWriteApiTransform(Schema schema) {
             schema.getFieldNames().containsAll(Arrays.asList("destination", 
"record")),
             "When writing to dynamic destinations, we expect Row Schema with a 
"
                 + "\"destination\" string field and a \"record\" Row field.");
-        RowDynamicDestinations dynamicDestination =
-            new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema());
-
-        if (isCdcConfigured()) {
-          dynamicDestination =
-              
dynamicDestination.withPrimaryKey(configuration.getUseCdcWritesWithPrimaryKey());
-          write = validateAndIncludeCDCInformation(write, schema);
-        }
         write =
             write
-                .to(dynamicDestination)
+                .to(new 
RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
                 .withFormatFunction(row -> 
BigQueryUtils.toTableRow(row.getRow("record")));
+      } else if 
(Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
+        write = validateAndIncludeCDCInformation(write, schema);

Review Comment:
   Can we make this a separate `if` block, outside of this if/else chain? We 
should be able to apply this method to both dynamic destination and single 
table cases. The only factor should be whether or not `useCdcWrites` is true



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2087,12 +2089,15 @@ def __init__(
         GCP expansion service. Used for STORAGE_WRITE_API method.
       max_insert_payload_size: The maximum byte size for a BigQuery legacy
         streaming insert payload.
-      cdc_writes_with_primary_key: Configure the usage of CDC writes on 
BigQuery
-        and sets the primary key using the column names from the argument.
-        The destination table (or tables if using dynamic destionations) will
-        be created in case of using CREATE_IF_NEEDED mode, and StorageWrite API
-        at least once mode will be configured. Used for STORAGE_WRITE_API
-        method.
+      use_cdc_writes: Configure the usage of CDC writes on BigQuery.
+        The argument can be used by passing True and the Beam Rows will be
+        sent as they are to the BigQuery sink which expects a 'record'
+        and 'cdc_info' properties.
+        Used for STORAGE_WRITE_API, working on 'at least once' mode.
+      cdc_writes_primary_key: When using CDC write on BigQuery and
+        CREATE_IF_NEEDED mode for the underlying tables a list of column names
+        is required to be configured as the primary key. Used for
+        STORAGE_WRITE_API.

Review Comment:
   ```suggestion
           STORAGE_WRITE_API and at_least_once mode.
   ```



##########
sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py:
##########
@@ -278,7 +278,58 @@ def test_write_with_beam_rows_cdc(self):
           p
           | beam.Create(rows_with_cdc)
           | StorageWriteToBigQuery(
-              table=table_id, cdc_writes_with_primary_key=["name"]))
+              table=table_id,
+              use_at_least_once=True,
+              use_cdc_writes=True,
+              cdc_writes_primary_key=["name"]))
+    hamcrest_assert(p, bq_matcher)
+
+  def test_write_dicts_cdc(self):
+    table = 'write_dicts_cdc'
+    table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+    expected_data_on_bq = [
+        # (name, value)
+        {
+            "name": "cdc_test",
+            "value": 5,
+        }
+    ]
+
+    schema = {
+        "fields": [{
+            "name": "name", "type": "STRING"
+        }, {
+            "name": "value", "type": "INTEGER"
+        }]
+    }
+
+    dicts = [{
+        "name": "cdc_test", "value": 3
+    }, {
+        "name": "cdc_test", "value": 5
+    }, {
+        "name": "cdc_test", "value": 4
+    }]
+
+    bq_matcher = BigqueryFullResultMatcher(
+        project=self.project,
+        query="SELECT * FROM {}.{}".format(self.dataset_id, table),
+        data=self.parse_expected_data(expected_data_on_bq))
+
+    with beam.Pipeline(argv=self.args) as p:
+      _ = (
+          p
+          | beam.Create(dicts)
+          | beam.io.WriteToBigQuery(
+              table=table_id,
+              method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
+              schema=schema,
+              use_at_least_once=True,
+              use_cdc_writes=lambda row: beam.Row(
+                  mutation_type="UPSERT",
+                  change_sequence_number="AAA/" + str(row.value)),

Review Comment:
   Note that when the user is writing Python dicts, they should be able to 
supply a function that works on those dicts (ie. they shouldn't have to know 
what a Beam Row). Most Python users are not aware that Beam Rows and the Java 
IO are being used under the hood. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to