This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9c989cc Fix auto-sharding parameter for BigQuery sink with FILE LOADS new 63c13d6 Merge pull request #14183 from [BEAM-11772] Fix auto-sharding parameter for BigQuery sink with FILE LOADS 9c989cc is described below commit 9c989ccde6a4053d49631e21fea4b89193a1fca0 Author: sychen <syc...@google.com> AuthorDate: Tue Mar 9 22:36:36 2021 -0800 Fix auto-sharding parameter for BigQuery sink with FILE LOADS --- .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 20 ++++++++++++-------- .../python/apache_beam/io/gcp/bigquery_file_loads.py | 13 +++++++++---- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 1828192..1372e3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -116,6 +116,10 @@ class BatchLoads<DestinationT, ElementT> // written. static final int FILE_TRIGGERING_RECORD_COUNT = 500000; + // If using auto-sharding for unbounded data, we batch the records before triggering file write + // to avoid generating too many small files. + static final Duration FILE_TRIGGERING_BATCHING_DURATION = Duration.standardSeconds(1); + // The maximum number of retries to poll the status of a job. // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; @@ -294,9 +298,9 @@ class BatchLoads<DestinationT, ElementT> .discardingFiredPanes()); results = writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView); } else { - // In the case of dynamic sharding, however, we use a default triggering and instead apply the - // user supplied triggeringFrequency to the sharding transform. See - // writeDynamicallyShardedFilesTriggered. + // In the case of dynamic sharding, however, we use a default trigger since the transform + // performs sharding also batches elements to avoid generating too many tiny files. User + // trigger is applied right after writes to limit the number of load jobs. PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow = input.apply( "rewindowIntoGlobal", @@ -569,15 +573,15 @@ class BatchLoads<DestinationT, ElementT> // of filename, file byte size, and table destination. PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered( PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) { - // In contrast to fixed sharding with triggering, here we use a global window with default - // trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches - // transform. We also ensure that the files are written if a threshold number of records are - // ready. Dynamic sharding is achieved via the withShardedKey() option provided by + // In contrast to fixed sharding with user trigger, here we use a global window with default + // trigger and rely on GroupIntoBatches transform to group, batch and at the same time + // parallelize properly. We also ensure that the files are written if a threshold number of + // records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by // GroupIntoBatches. return input .apply( GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withMaxBufferingDuration(triggeringFrequency) + .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION) .withShardedKey()) .setCoder( KvCoder.of( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 298ead6..608301e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -69,6 +69,10 @@ _MAXIMUM_SOURCE_URIS = 10 * 1000 # this many records are written. _FILE_TRIGGERING_RECORD_COUNT = 500000 +# If using auto-sharding for unbounded data, we batch the records before +# triggering file write to avoid generating too many small files. +_FILE_TRIGGERING_BATCHING_DURATION_SECS = 1 + def _generate_job_name(job_name, job_type, step_name): return bigquery_tools.generate_bq_job_name( @@ -729,9 +733,10 @@ class BigQueryBatchFileLoads(beam.PTransform): # We use only the user-supplied trigger on the actual BigQuery load. # This allows us to offload the data to the filesystem. # - # In the case of auto sharding, however, we use a default triggering and - # instead apply the user supplied triggering_frequency to the transfrom that - # performs sharding. + # In the case of dynamic sharding, however, we use a default trigger since + # the transform performs sharding also batches elements to avoid generating + # too many tiny files. User trigger is applied right after writes to limit + # the number of load jobs. if self.is_streaming_pipeline and not self.with_auto_sharding: return beam.WindowInto(beam.window.GlobalWindows(), trigger=trigger.Repeatedly( @@ -822,7 +827,7 @@ class BigQueryBatchFileLoads(beam.PTransform): lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1])) | 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey( batch_size=_FILE_TRIGGERING_RECORD_COUNT, - max_buffering_duration_secs=self.triggering_frequency, + max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS, clock=clock) | 'FromHashableTableRefAndDropShard' >> beam.Map( lambda kvs: