ahmedabu98 commented on code in PR #32529:
URL: https://github.com/apache/beam/pull/32529#discussion_r1779196331
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -344,14 +358,39 @@ public void process(ProcessContext c) {}
private static class RowDynamicDestinations extends
DynamicDestinations<Row, String> {
Schema schema;
+ String fixedDestination = null;
+ List<String> primaryKey = null;
RowDynamicDestinations(Schema schema) {
this.schema = schema;
}
+ RowDynamicDestinations withFixedDestination(String destination) {
+ this.fixedDestination = destination;
+ return this;
+ }
+
+ RowDynamicDestinations withPrimaryKey(List<String> primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
@Override
public String getDestination(ValueInSingleWindow<Row> element) {
- return element.getValue().getString("destination");
+ return fixedDestination != null
+ ? fixedDestination
+ : element.getValue().getString("destination");
+ }
+
+ @Override
+ public TableConstraints getTableConstraints(String destination) {
+ return Optional.ofNullable(this.primaryKey)
+ .filter(pk -> !pk.isEmpty())
+ .map(
+ pk ->
+ new TableConstraints()
+ .setPrimaryKey(new
TableConstraints.PrimaryKey().setColumns(pk)))
+ .orElse(null);
Review Comment:
> the PTrasform instantiated by the provider does not execute this code
Ahh the code block is executed only when dynamic destinations == null.
Agreed a lil unfortunate but that's okay -- passing the primary key to
RowDynamicDestinations and implementing `getTableConstraints()` should be
enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]