This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c989cc  Fix auto-sharding parameter for BigQuery sink with FILE LOADS
     new 63c13d6  Merge pull request #14183 from [BEAM-11772] Fix auto-sharding 
parameter for BigQuery sink with FILE LOADS
9c989cc is described below

commit 9c989ccde6a4053d49631e21fea4b89193a1fca0
Author: sychen <syc...@google.com>
AuthorDate: Tue Mar 9 22:36:36 2021 -0800

    Fix auto-sharding parameter for BigQuery sink with FILE LOADS
---
 .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java  | 20 ++++++++++++--------
 .../python/apache_beam/io/gcp/bigquery_file_loads.py | 13 +++++++++----
 2 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1828192..1372e3f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -116,6 +116,10 @@ class BatchLoads<DestinationT, ElementT>
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
 
+  // If using auto-sharding for unbounded data, we batch the records before 
triggering file write
+  // to avoid generating too many small files.
+  static final Duration FILE_TRIGGERING_BATCHING_DURATION = 
Duration.standardSeconds(1);
+
   // The maximum number of retries to poll the status of a job.
   // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job 
finishes.
   static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@@ -294,9 +298,9 @@ class BatchLoads<DestinationT, ElementT>
                   .discardingFiredPanes());
       results = writeStaticallyShardedFiles(inputInGlobalWindow, 
tempFilePrefixView);
     } else {
-      // In the case of dynamic sharding, however, we use a default triggering 
and instead apply the
-      // user supplied triggeringFrequency to the sharding transform. See
-      // writeDynamicallyShardedFilesTriggered.
+      // In the case of dynamic sharding, however, we use a default trigger 
since the transform
+      // performs sharding also batches elements to avoid generating too many 
tiny files. User
+      // trigger is applied right after writes to limit the number of load 
jobs.
       PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
           input.apply(
               "rewindowIntoGlobal",
@@ -569,15 +573,15 @@ class BatchLoads<DestinationT, ElementT>
   // of filename, file byte size, and table destination.
   PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeDynamicallyShardedFilesTriggered(
       PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> 
tempFilePrefix) {
-    // In contrast to fixed sharding with triggering, here we use a global 
window with default
-    // trigger and apply the user supplied triggeringFrequency in the 
subsequent GroupIntoBatches
-    // transform. We also ensure that the files are written if a threshold 
number of records are
-    // ready. Dynamic sharding is achieved via the withShardedKey() option 
provided by
+    // In contrast to fixed sharding with user trigger, here we use a global 
window with default
+    // trigger and rely on GroupIntoBatches transform to group, batch and at 
the same time
+    // parallelize properly. We also ensure that the files are written if a 
threshold number of
+    // records are ready. Dynamic sharding is achieved via the 
withShardedKey() option provided by
     // GroupIntoBatches.
     return input
         .apply(
             GroupIntoBatches.<DestinationT, 
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
-                .withMaxBufferingDuration(triggeringFrequency)
+                .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION)
                 .withShardedKey())
         .setCoder(
             KvCoder.of(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 298ead6..608301e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -69,6 +69,10 @@ _MAXIMUM_SOURCE_URIS = 10 * 1000
 # this many records are written.
 _FILE_TRIGGERING_RECORD_COUNT = 500000
 
+# If using auto-sharding for unbounded data, we batch the records before
+# triggering file write to avoid generating too many small files.
+_FILE_TRIGGERING_BATCHING_DURATION_SECS = 1
+
 
 def _generate_job_name(job_name, job_type, step_name):
   return bigquery_tools.generate_bq_job_name(
@@ -729,9 +733,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
     # We use only the user-supplied trigger on the actual BigQuery load.
     # This allows us to offload the data to the filesystem.
     #
-    # In the case of auto sharding, however, we use a default triggering and
-    # instead apply the user supplied triggering_frequency to the transfrom 
that
-    # performs sharding.
+    # In the case of dynamic sharding, however, we use a default trigger since
+    # the transform performs sharding also batches elements to avoid generating
+    # too many tiny files. User trigger is applied right after writes to limit
+    # the number of load jobs.
     if self.is_streaming_pipeline and not self.with_auto_sharding:
       return beam.WindowInto(beam.window.GlobalWindows(),
                              trigger=trigger.Repeatedly(
@@ -822,7 +827,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
             lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1]))
         | 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey(
             batch_size=_FILE_TRIGGERING_RECORD_COUNT,
-            max_buffering_duration_secs=self.triggering_frequency,
+            
max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS,
             clock=clock)
         | 'FromHashableTableRefAndDropShard' >> beam.Map(
             lambda kvs:

Reply via email to