This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 227a846dc2d Merge pull request #25804: Reuse client when constructing 
StreamWriter
227a846dc2d is described below

commit 227a846dc2dd0b78129739d0501d3dae31b17484
Author: Reuven Lax <re...@google.com>
AuthorDate: Sat Mar 11 09:55:13 2023 -0800

    Merge pull request #25804: Reuse client when constructing StreamWriter
---
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3687a163d76..4e2e7aec0ec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1338,7 +1338,7 @@ class BigQueryServicesImpl implements BigQueryServices {
               .build();
 
       StreamWriter streamWriter =
-          StreamWriter.newBuilder(streamName)
+          StreamWriter.newBuilder(streamName, newWriteClient)
               .setExecutorProvider(
                   FixedExecutorProvider.create(
                       
options.as(ExecutorOptions.class).getScheduledExecutorService()))
@@ -1514,9 +1514,18 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions 
options) {
     try {
+      TransportChannelProvider transportChannelProvider =
+          BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+              .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+              .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+              .setKeepAliveWithoutCalls(true)
+              .setChannelsPerCpu(2)
+              .build();
+
       return BigQueryWriteClient.create(
           BigQueryWriteSettings.newBuilder()
               .setCredentialsProvider(() -> 
options.as(GcpOptions.class).getGcpCredential())
+              .setTransportChannelProvider(transportChannelProvider)
               .setBackgroundExecutorProvider(
                   FixedExecutorProvider.create(
                       
options.as(ExecutorOptions.class).getScheduledExecutorService()))

Reply via email to