ahmedabu98 commented on code in PR #32529:
URL: https://github.com/apache/beam/pull/32529#discussion_r1775154512
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -207,7 +211,8 @@ public void validate() {
*/
public static Builder builder() {
return new
AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration
- .Builder();
+ .Builder()
+ .setUseCdcWritesWithPrimaryKey(ImmutableList.of());
Review Comment:
nit: consider removing this so it's more straightforward to determine if a
user set it
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -468,15 +507,33 @@ BigQueryIO.Write<Row>
createStorageWriteApiTransform(Schema schema) {
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
- schema.getFieldNames().equals(Arrays.asList("destination",
"record")),
+ schema.getFieldNames().containsAll(Arrays.asList("destination",
"record")),
"When writing to dynamic destinations, we expect Row Schema with a
"
+ "\"destination\" string field and a \"record\" Row field.");
+ RowDynamicDestinations dynamicDestination =
+ new
RowDynamicDestinations(schema.getField("record").getType().getRowSchema());
+
+ if (isCdcConfigured()) {
Review Comment:
CDC writes should be unrelated to dynamic destinations. Consider simply
having a new if block (after this existing dynamic destinations if/else logic)
that checks if CDC writes are enabled and calls
`validateAndIncludeCDCInformation(..)`.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -257,6 +262,13 @@ public static Builder builder() {
@Nullable
public abstract ErrorHandling getErrorHandling();
+ @SchemaFieldDescription(
+ "This option enables the use of BigQuery CDC functionality. It expects
a Row schema"
+ + " wrapping the record to be inserted and adding the CDC info
similar to:"
+ + " {cdc_info: {mutation_type:\"...\",
change_sequence_number:\"...\"}, record: {...}}")
+ @Nullable
+ public abstract List<String> getUseCdcWritesWithPrimaryKey();
Review Comment:
The only time user needs to specify a primary key is when they expect us to
create the table (ie.
[CREATE_IF_NEEDED](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L3476-L3478)).
If the table already exists, they should be able to use cdc writes without
needing to provide a primary key.
Might be a good idea to decouple "use cdc writes" and "primary key" into two
different options
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -344,14 +358,39 @@ public void process(ProcessContext c) {}
private static class RowDynamicDestinations extends
DynamicDestinations<Row, String> {
Schema schema;
+ String fixedDestination = null;
+ List<String> primaryKey = null;
RowDynamicDestinations(Schema schema) {
this.schema = schema;
}
+ RowDynamicDestinations withFixedDestination(String destination) {
+ this.fixedDestination = destination;
+ return this;
+ }
+
+ RowDynamicDestinations withPrimaryKey(List<String> primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
@Override
public String getDestination(ValueInSingleWindow<Row> element) {
- return element.getValue().getString("destination");
+ return fixedDestination != null
+ ? fixedDestination
+ : element.getValue().getString("destination");
+ }
+
+ @Override
+ public TableConstraints getTableConstraints(String destination) {
+ return Optional.ofNullable(this.primaryKey)
+ .filter(pk -> !pk.isEmpty())
+ .map(
+ pk ->
+ new TableConstraints()
+ .setPrimaryKey(new
TableConstraints.PrimaryKey().setColumns(pk)))
+ .orElse(null);
Review Comment:
Note that BigQueryIO already attaches TableConstraints to the input
destinations object:
https://github.com/apache/beam/blob/d93f93abf0e44c050781a635c3c5a834dd292627/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L3537-L3544
To avoid duplicate logic, can we rely on the above and do away with the
changes to `RowDynamicDestinations`?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -498,5 +555,40 @@ BigQueryIO.Write<Row>
createStorageWriteApiTransform(Schema schema) {
return write;
}
+
+ boolean isCdcConfigured() {
+ return
!Optional.ofNullable(configuration.getUseCdcWritesWithPrimaryKey())
+ .orElse(ImmutableList.of())
+ .isEmpty();
+ }
+
+ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
+ BigQueryIO.Write<Row> write, Schema schema) {
+ checkArgument(
+ schema.getFieldNames().containsAll(Arrays.asList("cdc_info",
"record")),
+ "When writing using CDC functionality, we expect Row Schema with a "
+ + "\"cdc_info\" Row field and a \"record\" Row field.");
+ checkArgument(
+ schema
+ .getField("cdc_info")
+ .getType()
+ .getRowSchema()
+ .equals(
+ Schema.builder()
+ .addStringField("mutation_type")
+ .addStringField("change_sequence_number")
+ .build()),
+ "When writing using CDC functionality, we expect a \"cdc_info\"
field of Row type "
+ + "with fields \"mutation_type\" and \"change_sequence_number\"
of type string.");
+ return write
+ .withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
Review Comment:
This part looks great, but let's pass through the user-specified write
method. CDC writes require at-least-once, but I'd rather avoid a situation
where a user thinks they are using exactly-once.
Our sink will throw an error if at-least-once isn't specified anyways, so
user should be able to correct it easily.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -498,5 +555,40 @@ BigQueryIO.Write<Row>
createStorageWriteApiTransform(Schema schema) {
return write;
}
+
+ boolean isCdcConfigured() {
+ return
!Optional.ofNullable(configuration.getUseCdcWritesWithPrimaryKey())
+ .orElse(ImmutableList.of())
+ .isEmpty();
+ }
+
+ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
+ BigQueryIO.Write<Row> write, Schema schema) {
+ checkArgument(
+ schema.getFieldNames().containsAll(Arrays.asList("cdc_info",
"record")),
+ "When writing using CDC functionality, we expect Row Schema with a "
+ + "\"cdc_info\" Row field and a \"record\" Row field.");
+ checkArgument(
+ schema
+ .getField("cdc_info")
+ .getType()
+ .getRowSchema()
+ .equals(
+ Schema.builder()
+ .addStringField("mutation_type")
+ .addStringField("change_sequence_number")
+ .build()),
+ "When writing using CDC functionality, we expect a \"cdc_info\"
field of Row type "
+ + "with fields \"mutation_type\" and \"change_sequence_number\"
of type string.");
+ return write
+ .withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
Review Comment:
P.S. let's add a helpful error message to this line:
https://github.com/apache/beam/blob/d93f93abf0e44c050781a635c3c5a834dd292627/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L3474-L3475
##########
sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py:
##########
@@ -245,6 +245,42 @@ def test_write_with_beam_rows(self):
| StorageWriteToBigQuery(table=table_id))
hamcrest_assert(p, bq_matcher)
+ def test_write_with_beam_rows_cdc(self):
Review Comment:
Can we add a test using `WriteToBigQuery` with dicts? just to make sure
we're testing fully end-to-end
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2523,6 +2532,7 @@ def __init__(
use_at_least_once=False,
with_auto_sharding=False,
num_storage_api_streams=0,
+ cdc_writes_with_primary_key: List[str] = None,
Review Comment:
Non-blocking if you're trying to get this in soon, but I was thinking we
could allow users to specify either a boolean or a callable for
`use_cdc_writes`:
- If they provide a boolean, we pass their records directly to the Java IO
- If they provide a callable, we pass their rows to the callable then to the
Java IO
Either way, we make it clear that the resulting rows should have the
appropriate `cdc_info` fields.
Could be a future improvement if the current use-case is sufficient for now.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2086,6 +2087,12 @@ def __init__(
GCP expansion service. Used for STORAGE_WRITE_API method.
max_insert_payload_size: The maximum byte size for a BigQuery legacy
streaming insert payload.
+ cdc_writes_with_primary_key: Configure the usage of CDC writes on
BigQuery
+ and sets the primary key using the column names from the argument.
+ The destination table (or tables if using dynamic destionations) will
+ be created in case of using CREATE_IF_NEEDED mode, and StorageWrite API
+ at least once mode will be configured. Used for STORAGE_WRITE_API
+ method.
Review Comment:
Consider splitting to two parameters `use_cdc writes` and `primary_key` (for
reasons mentioned in a previous comment), and call out that primary key is only
needed when setting `CREATE_IF_NEEDED`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]