This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 881bbaa0aac 
DynamicDestinationsHelper.ConstantTimePartitioningClusteringDestinations  is 
parsing per element json configuration for partitioning and clustering which is 
expensive. Cache the outcome of evaluation so it's done once. (#37014)
881bbaa0aac is described below

commit 881bbaa0aac1a030fdea3dac9123231ec62920cd
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Wed Jan 7 20:14:59 2026 +0100

    DynamicDestinationsHelper.ConstantTimePartitioningClusteringDestinations  
is parsing per element json configuration for partitioning and clustering which 
is expensive. Cache the outcome of evaluation so it's done once. (#37014)
---
 .../gcp/bigquery/DynamicDestinationsHelpers.java   | 47 +++++++++++++++++-----
 1 file changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index eed4314e391..52b5b954a09 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -30,7 +30,7 @@ import com.google.gson.JsonParser;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -279,6 +279,11 @@ class DynamicDestinationsHelpers {
     private final @Nullable ValueProvider<String> jsonTimePartitioning;
     private final @Nullable ValueProvider<String> jsonClustering;
 
+    // Lazily initialized and cached values.
+    private @Nullable String evaluatedPartitioning = null;
+    private @Nullable String evaluatedClustering = null;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
     ConstantTimePartitioningClusteringDestinations(
         DynamicDestinations<T, TableDestination> inner,
         ValueProvider<String> jsonTimePartitioning,
@@ -299,19 +304,41 @@ class DynamicDestinationsHelpers {
       this.jsonClustering = jsonClustering;
     }
 
+    static boolean isJsonConfigPresent(ValueProvider<String> json) {
+      String jsonValue = json.get();
+      return jsonValue != null && 
!JsonParser.parseString(jsonValue).getAsJsonObject().isEmpty();
+    }
+
+    private synchronized void evaluateOncePartitioningAndClustering() {
+      if (initialized.get()) {
+        return;
+      }
+      if (jsonTimePartitioning != null) {
+        if (isJsonConfigPresent(jsonTimePartitioning)) {
+          this.evaluatedPartitioning = jsonTimePartitioning.get();
+        }
+      }
+      if (jsonClustering != null) {
+        if (isJsonConfigPresent(jsonClustering)) {
+          this.evaluatedClustering = jsonClustering.get();
+        }
+      }
+      initialized.set(true);
+    }
+
     @Override
     public TableDestination getDestination(@Nullable ValueInSingleWindow<T> 
element) {
+      if (!initialized.get()) {
+        evaluateOncePartitioningAndClustering();
+      }
       TableDestination destination = super.getDestination(element);
+
       String partitioning =
-          
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
-      if (partitioning == null
-          || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) 
{
-        partitioning = destination.getJsonTimePartitioning();
-      }
-      String clustering = 
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);
-      if (clustering == null || 
JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
-        clustering = destination.getJsonClustering();
-      }
+          evaluatedPartitioning != null
+              ? evaluatedPartitioning
+              : destination.getJsonTimePartitioning();
+      String clustering =
+          evaluatedClustering != null ? evaluatedClustering : 
destination.getJsonClustering();
 
       return new TableDestination(
           destination.getTableSpec(), destination.getTableDescription(), 
partitioning, clustering);

Reply via email to