ahmedabu98 commented on code in PR #35526: URL: https://github.com/apache/beam/pull/35526#discussion_r2213322978
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java: ########## @@ -313,6 +313,13 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) { WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase()); write = write.withWriteDisposition(writeDisposition); } + + // List<String> clusteringFields = configuration.getClustering(); + // if (clusteringFields != null && !clusteringFields.isEmpty()) { + // Clustering clustering = new Clustering().setFields(clusteringFields); + // write = write.withClustering(clustering); + // } + Review Comment: cleanup ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java: ########## @@ -194,6 +194,9 @@ public static Builder builder() { + "Is mutually exclusive with 'keep' and 'drop'.") public abstract @Nullable String getOnly(); + @SchemaFieldDescription("A list of columns to cluster the BigQuery table by.") + public abstract @Nullable List<String> getClustering(); Review Comment: nit: what do you think of the name `getClusteringFields()`? We mainly thinking about the generated parameter name for ManagedIO and Xlang: `clustering` vs `clustering_fields` ########## sdks/python/apache_beam/io/gcp/bigquery.py: ########## @@ -2629,6 +2632,15 @@ def expand(self, input): # communicate to Java that this write should use dynamic destinations table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS + clustering_fields = [] + if self.additional_bq_parameters: + if callable(self.additional_bq_parameters): + raise NotImplementedError( + "Currently, dynamic clustering and timepartitioning is not " + "supported for this write method.") Review Comment: nit: let's mention the write method in the err message ########## sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py: ########## @@ -245,6 +245,30 @@ def test_write_with_beam_rows(self): | StorageWriteToBigQuery(table=table_id)) hamcrest_assert(p, bq_matcher) + def test_write_with_clustering(self): + table = 'write_with_clustering' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + additional_bq_parameters={'clustering': { + 'fields': ['int'] + }})) Review Comment: like with other tests here, let's also validate the data was written correctly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org