This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d429dde47b Add options to control number of Storage API connections 
when using multiplexing (#31721)
4d429dde47b is described below

commit 4d429dde47b570ccabba6b86173eb2546f76a5f2
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Jul 12 14:46:37 2024 -0400

    Add options to control number of Storage API connections when using 
multiplexing (#31721)
    
    * add options to set min and max connections to connection management pool; 
rename counter to be more accurate
    
    * add multiplexing description
    
    * add to CHANGES.md
    
    * clarify documentation and address comments
    
    * adjust description
    
    * add details
---
 CHANGES.md                                         |  2 ++
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  8 +++----
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  | 27 ++++++++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  9 ++++++++
 4 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 96c436d89ec..fc94877a2bb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
 ## New Features / Improvements
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
+* Added options to control the number of Storage API multiplexing connections 
([#31721](https://github.com/apache/beam/pull/31721))
 * [IcebergIO] All specified catalog properties are passed through to the 
connector ([#31726](https://github.com/apache/beam/pull/31726))
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
 * Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
@@ -82,6 +83,7 @@
 
 ## Bugfixes
 
+* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted 
concurrent connections quota 
([#31710](https://github.com/apache/beam/pull/31710))
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Security Fixes
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 5a12e81ea79..7505f77fb5b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.metrics.Metrics;
  */
 @AutoValue
 abstract class AppendClientInfo {
-  private final Counter activeConnections =
-      Metrics.counter(AppendClientInfo.class, "activeConnections");
+  private final Counter activeStreamAppendClients =
+      Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");
 
   abstract @Nullable BigQueryServices.StreamAppendClient 
getStreamAppendClient();
 
@@ -123,7 +123,7 @@ abstract class AppendClientInfo {
           writeStreamService.getStreamAppendClient(
               streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation);
 
-      activeConnections.inc();
+      activeStreamAppendClients.inc();
 
       return 
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
     }
@@ -133,7 +133,7 @@ abstract class AppendClientInfo {
     BigQueryServices.StreamAppendClient client = getStreamAppendClient();
     if (client != null) {
       getCloseAppendClient().accept(client);
-      activeConnections.dec();
+      activeStreamAppendClients.dec();
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index cd1fc6d3842..ba76f483f77 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -109,6 +109,28 @@ public interface BigQueryOptions
 
   void setNumStorageWriteApiStreamAppendClients(Integer value);
 
+  @Description(
+      "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing 
(ie. useStorageApiConnectionPool=true), "
+          + "this option sets the minimum number of connections each pool 
creates before any connections are shared. This is "
+          + "on a per worker, per region basis. Note that in practice, the 
minimum number of connections created is the minimum "
+          + "of this value and (numStorageWriteApiStreamAppendClients x num 
destinations). BigQuery will create this many "
+          + "connections at first and will only create more connections if the 
current ones are \"overwhelmed\". Consider "
+          + "increasing this value if you are running into performance 
issues.")
+  @Default.Integer(2)
+  Integer getMinConnectionPoolConnections();
+
+  void setMinConnectionPoolConnections(Integer value);
+
+  @Description(
+      "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing 
(ie. useStorageApiConnectionPool=true), "
+          + "this option sets the maximum number of connections each pool 
creates. This is on a per worker, per region basis. "
+          + "If writing to many dynamic destinations (>20) and experiencing 
performance issues or seeing append operations competing"
+          + "for streams, consider increasing this value.")
+  @Default.Integer(20)
+  Integer getMaxConnectionPoolConnections();
+
+  void setMaxConnectionPoolConnections(Integer value);
+
   @Description("The max number of messages inflight that we expect each 
connection will retain.")
   @Default.Long(1000)
   Long getStorageWriteMaxInflightRequests();
@@ -122,6 +144,11 @@ public interface BigQueryOptions
 
   void setStorageWriteMaxInflightBytes(Long value);
 
+  @Description(
+      "Enables multiplexing mode, where multiple tables can share the same 
connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+          + " mode. This is recommended if your write operation is creating 
20+ connections. When using multiplexing, consider tuning "
+          + "the number of connections created by the connection pool with 
minConnectionPoolConnections and maxConnectionPoolConnections. "
+          + "For more information, see 
https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management";)
   @Default.Boolean(false)
   Boolean getUseStorageApiConnectionPool();
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 2bdba0b053c..c6b0e17e59d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -69,6 +69,7 @@ import 
com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
 import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
 import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
 import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
 import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
 import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
 import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
@@ -1423,6 +1424,14 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
               bqIOMetadata.getBeamJobId() == null ? "" : 
bqIOMetadata.getBeamJobId(),
               bqIOMetadata.getBeamWorkerId() == null ? "" : 
bqIOMetadata.getBeamWorkerId());
 
+      ConnectionWorkerPool.setOptions(
+          ConnectionWorkerPool.Settings.builder()
+              .setMinConnectionsPerRegion(
+                  
options.as(BigQueryOptions.class).getMinConnectionPoolConnections())
+              .setMaxConnectionsPerRegion(
+                  
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
+              .build());
+
       StreamWriter streamWriter =
           StreamWriter.newBuilder(streamName, newWriteClient)
               .setExecutorProvider(

Reply via email to