This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 171d3e3d9d7 Merge pull request #23510: Vortex multiplexing streams
171d3e3d9d7 is described below

commit 171d3e3d9d700aa90c498719a0a925916c1cb56e
Author: Reuven Lax <re...@google.com>
AuthorDate: Tue Oct 18 21:12:15 2022 -0700

    Merge pull request #23510: Vortex multiplexing streams
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  6 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |  5 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  4 +--
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  5 +--
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 36 ++++++++++++++++------
 .../bigquery/StorageApiWritesShardedRecords.java   |  4 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java     |  3 +-
 7 files changed, 43 insertions(+), 20 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a81dd6b9055..26a14e86012 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -576,12 +576,12 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_client_jackson2                  : 
"com.google.api-client:google-api-client-jackson2:$google_clients_version",
         google_api_client_java6                     : 
"com.google.api-client:google-api-client-java6:$google_clients_version",
         google_api_common                           : 
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
-        google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev20220827-$google_clients_version",
+        google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev20220924-$google_clients_version",
         google_api_services_clouddebugger           : 
"com.google.apis:google-api-services-clouddebugger:v2-rev20220318-$google_clients_version",
         google_api_services_cloudresourcemanager    : 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20220828-$google_clients_version",
-        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220812-$google_clients_version",
+        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
         google_api_services_healthcare              : 
"com.google.apis:google-api-services-healthcare:v1-rev20220818-$google_clients_version",
-        google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220829-$google_clients_version",
+        google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
         google_api_services_storage                 : 
"com.google.apis:google-api-services-storage:v1-rev20220705-$google_clients_version",
         google_auth_library_credentials             : 
"com.google.auth:google-auth-library-credentials", // 
google_cloud_platform_libraries_bom sets version
         google_auth_library_oauth2_http             : 
"com.google.auth:google-auth-library-oauth2-http", // 
google_cloud_platform_libraries_bom sets version
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index cfebb754216..953d1237d9c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -109,6 +109,11 @@ public interface BigQueryOptions
 
   void setNumStorageWriteApiStreamAppendClients(Integer value);
 
+  @Default.Boolean(false)
+  Boolean getUseStorageApiConnectionPool();
+
+  void setUseStorageApiConnectionPool(Boolean value);
+
   @Description(
       "If set, then BigQueryIO.Write will default to triggering the Storage 
Write API writes this often.")
   Integer getStorageWriteApiTriggeringFrequencySec();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index a0484ccf972..adf02ed31a6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -208,8 +208,8 @@ public interface BigQueryServices extends Serializable {
      * Create an append client for a given Storage API write stream. The 
stream must be created
      * first.
      */
-    StreamAppendClient getStreamAppendClient(String streamName, Descriptor 
descriptor)
-        throws Exception;
+    StreamAppendClient getStreamAppendClient(
+        String streamName, Descriptor descriptor, boolean useConnectionPool) 
throws Exception;
 
     /** Flush a given stream up to the given offset. The stream must have type 
BUFFERED. */
     ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3cb4e5cce8c..9df75a5be94 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1305,8 +1305,8 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @Override
-    public StreamAppendClient getStreamAppendClient(String streamName, 
Descriptor descriptor)
-        throws Exception {
+    public StreamAppendClient getStreamAppendClient(
+        String streamName, Descriptor descriptor, boolean useConnectionPool) 
throws Exception {
       ProtoSchema protoSchema =
           
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
 
@@ -1322,6 +1322,7 @@ class BigQueryServicesImpl implements BigQueryServices {
           StreamWriter.newBuilder(streamName)
               .setWriterSchema(protoSchema)
               .setChannelProvider(transportChannelProvider)
+              .setEnableConnectionPool(useConnectionPool)
               .setTraceId(
                   "Dataflow:"
                       + (bqIOMetadata.getBeamJobId() != null
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 607f06c4e00..871fc73698a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -131,6 +131,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
   public PCollection<Void> expand(PCollection<KV<DestinationT, 
StorageApiWritePayload>> input) {
     String operationName = input.getName() + "/" + getName();
     BigQueryOptions options = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(
+        !options.getUseStorageApiConnectionPool(),
+        "useStorageApiConnectionPool only supported " + "when using 
STORAGE_API_AT_LEAST_ONCE");
     return input
         .apply(
             "Write Records",
@@ -176,13 +179,15 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       private DescriptorWrapper descriptorWrapper;
       private Instant nextCacheTickle = Instant.MAX;
       private final int clientNumber;
+      private final boolean usingMultiplexing;
 
       public DestinationState(
           String tableUrn,
           MessageConverter<ElementT> messageConverter,
           DatasetService datasetService,
           boolean useDefaultStream,
-          int streamAppendClientCount) {
+          int streamAppendClientCount,
+          BigQueryOptions bigQueryOptions) {
         this.tableUrn = tableUrn;
         this.messageConverter = messageConverter;
         this.pendingMessages = Lists.newArrayList();
@@ -190,6 +195,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         this.useDefaultStream = useDefaultStream;
         this.descriptorWrapper = messageConverter.getSchemaDescriptor();
         this.clientNumber = new Random().nextInt(streamAppendClientCount);
+        this.usingMultiplexing = 
bigQueryOptions.getUseStorageApiConnectionPool();
       }
 
       void teardown() {
@@ -229,7 +235,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
 
       StreamAppendClient generateClient() throws Exception {
         Preconditions.checkStateNotNull(maybeDatasetService);
-        return maybeDatasetService.getStreamAppendClient(streamName, 
descriptorWrapper.descriptor);
+        return maybeDatasetService.getStreamAppendClient(
+            streamName, descriptorWrapper.descriptor, usingMultiplexing);
       }
 
       StreamAppendClient getStreamAppendClient(boolean lookupCache) {
@@ -342,11 +349,13 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                   this.currentOffset += inserts.getSerializedRowsCount();
                 }
                 ApiFuture<AppendRowsResponse> response = 
writeStream.appendRows(offset, protoRows);
-                
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
-                if (writeStream.getInflightWaitSeconds() > 5) {
-                  LOG.warn(
-                      "Storage Api write delay more than {} seconds.",
-                      writeStream.getInflightWaitSeconds());
+                if (!usingMultiplexing) {
+                  
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
+                  if (writeStream.getInflightWaitSeconds() > 5) {
+                    LOG.warn(
+                        "Storage Api write delay more than {} seconds.",
+                        writeStream.getInflightWaitSeconds());
+                  }
                 }
                 return response;
               } catch (Exception e) {
@@ -456,7 +465,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
     }
 
     DestinationState createDestinationState(
-        ProcessContext c, DestinationT destination, DatasetService 
datasetService) {
+        ProcessContext c,
+        DestinationT destination,
+        DatasetService datasetService,
+        BigQueryOptions bigQueryOptions) {
       TableDestination tableDestination1 = 
dynamicDestinations.getTable(destination);
       checkArgument(
           tableDestination1 != null,
@@ -475,7 +487,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           messageConverter,
           datasetService,
           useDefaultStream,
-          streamAppendClientCount);
+          streamAppendClientCount,
+          bigQueryOptions);
     }
 
     @ProcessElement
@@ -489,7 +502,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       Preconditions.checkStateNotNull(destinations);
       DestinationState state =
           destinations.computeIfAbsent(
-              element.getKey(), k -> createDestinationState(c, k, 
initializedDatasetService));
+              element.getKey(),
+              k ->
+                  createDestinationState(
+                      c, k, initializedDatasetService, 
pipelineOptions.as(BigQueryOptions.class)));
       flushIfNecessary();
       state.addMessage(element.getValue());
       ++numPendingRecords;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index afecc966955..c8bb805b6e8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -389,7 +389,7 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
                       stream,
                       () ->
                           datasetService.getStreamAppendClient(
-                              stream, descriptor.get().descriptor));
+                              stream, descriptor.get().descriptor, false));
               for (AppendRowsContext context : contexts) {
                 context.streamName = stream;
                 appendClient.pin();
@@ -430,7 +430,7 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
                         context.streamName,
                         () ->
                             datasetService.getStreamAppendClient(
-                                context.streamName, 
descriptor.get().descriptor));
+                                context.streamName, 
descriptor.get().descriptor, false));
                 return appendClient.appendRows(context.offset, protoRows);
               } catch (Exception e) {
                 throw new RuntimeException(e);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 213d6498a0e..44f73bd56cb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -492,7 +492,8 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
   }
 
   @Override
-  public StreamAppendClient getStreamAppendClient(String streamName, 
Descriptor descriptor) {
+  public StreamAppendClient getStreamAppendClient(
+      String streamName, Descriptor descriptor, boolean useConnectionPool) {
     return new StreamAppendClient() {
       private Descriptor protoDescriptor;
 

Reply via email to