reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r870727332


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,12 +93,21 @@
   void setUseStorageWriteApiAtLeastOnce(Boolean value);
 
   @Description(
-      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams.")
+      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams. ")
   @Default.Integer(0)
   Integer getNumStorageWriteApiStreams();
 
   void setNumStorageWriteApiStreams(Integer value);
 
+  @Description(
+      "The number of stream append clients indicated will be allocated at a 
per worker and destination "
+          + "basis. A large value can cause a large pipeline to go over the 
BigQuery connection quota quickly. "
+          + "With low-mid volume pipelines using the default configuration 
should be enough.")
+  @Default.Integer(1)

Review Comment:
   A bit confusing - need to clarify that this only applies for at-least once 
writes using the default stream



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -197,6 +204,10 @@ String getDefaultStreamName() {
         return BigQueryHelpers.stripPartitionDecorator(tableUrn) + 
"/streams/_default";
       }
 
+      String getStreamAppendClientCacheEntryName() {
+        return getDefaultStreamName() + "-client" + clientNumber;
+      }

Review Comment:
   This is a bit weird, since this code doesn't always use the default stream. 
Now the cache is probably not needed in the non default stream case (since 
we'll create a new stream for every bundle), however if we change that we need 
to rename the cache and also make sure to close the client (since right now we 
rely on the cache removal listener to close the client)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean 
lookupCache) {
               if (lookupCache) {
                 this.streamAppendClient =
                     APPEND_CLIENTS.get(
-                        streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                streamName, descriptorWrapper.descriptor));
+                        getStreamAppendClientCacheEntryName(), () -> 
createStreamAppendClient());

Review Comment:
   It will also be broken if you have two sinks in the pipeline, one using the 
default stream and one not.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean 
lookupCache) {
               if (lookupCache) {
                 this.streamAppendClient =
                     APPEND_CLIENTS.get(
-                        streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                streamName, descriptorWrapper.descriptor));
+                        getStreamAppendClientCacheEntryName(), () -> 
createStreamAppendClient());

Review Comment:
   getStreamAppendClientCacheEntryName doesn't necessarily return the stream 
name we used



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java:
##########
@@ -50,14 +49,16 @@ public PCollection<Void> 
expand(PCollection<KV<DestinationT, StorageApiWritePayl
     BigQueryOptions bigQueryOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
     // Append records to the Storage API streams.
     input.apply(
-        "Write Records",

Review Comment:
   Changing transform names can affect update compatibility - do you need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to