ahmedabu98 commented on code in PR #35526:
URL: https://github.com/apache/beam/pull/35526#discussion_r2213322978


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -313,6 +313,13 @@ BigQueryIO.Write<Row> 
createStorageWriteApiTransform(Schema schema) {
             
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
         write = write.withWriteDisposition(writeDisposition);
       }
+
+      // List<String> clusteringFields = configuration.getClustering();
+      // if (clusteringFields != null && !clusteringFields.isEmpty()) {
+      //   Clustering clustering = new 
Clustering().setFields(clusteringFields);
+      //   write = write.withClustering(clustering);
+      // }
+

Review Comment:
   cleanup



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java:
##########
@@ -194,6 +194,9 @@ public static Builder builder() {
           + "Is mutually exclusive with 'keep' and 'drop'.")
   public abstract @Nullable String getOnly();
 
+  @SchemaFieldDescription("A list of columns to cluster the BigQuery table 
by.")
+  public abstract @Nullable List<String> getClustering();

Review Comment:
   nit: what do you think of the name `getClusteringFields()`?
   
   We mainly thinking about the generated parameter name for ManagedIO and 
Xlang: `clustering` vs `clustering_fields`



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2629,6 +2632,15 @@ def expand(self, input):
       # communicate to Java that this write should use dynamic destinations
       table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS
 
+    clustering_fields = []
+    if self.additional_bq_parameters:
+      if callable(self.additional_bq_parameters):
+        raise NotImplementedError(
+            "Currently, dynamic clustering and timepartitioning is not "
+            "supported for this write method.")

Review Comment:
   nit: let's mention the write method in the err message



##########
sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py:
##########
@@ -245,6 +245,30 @@ def test_write_with_beam_rows(self):
           | StorageWriteToBigQuery(table=table_id))
     hamcrest_assert(p, bq_matcher)
 
+  def test_write_with_clustering(self):
+    table = 'write_with_clustering'
+    table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+    with beam.Pipeline(argv=self.args) as p:
+      _ = (
+          p
+          | "Create test data" >> beam.Create(self.ELEMENTS)
+          | beam.io.WriteToBigQuery(
+              table=table_id,
+              method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
+              schema=self.ALL_TYPES_SCHEMA,
+              create_disposition='CREATE_IF_NEEDED',
+              write_disposition='WRITE_TRUNCATE',
+              additional_bq_parameters={'clustering': {
+                  'fields': ['int']
+              }}))

Review Comment:
   like with other tests here, let's also validate the data was written 
correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to