reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r865447469


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,7 +93,10 @@
   void setUseStorageWriteApiAtLeastOnce(Boolean value);
 
   @Description(
-      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams.")
+      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams. "
+          + "The number of streams indicated will be allocated at a per worker 
and destination basis, "
+          + "a high number can cause a large pipeline to go over the BigQuery 
connection quota quickly. "
+          + "With low-mid volume pipelines using the default configuration 
should be enough.")
   @Default.Integer(0)

Review Comment:
   We should create a new option. This option refers to physical stream 
creation (i.e. createStream), not the number of grpc streams.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -129,6 +135,9 @@ public StorageApiWriteUnshardedRecords(
   public PCollection<Void> expand(PCollection<KV<DestinationT, 
StorageApiWritePayload>> input) {
     String operationName = input.getName() + "/" + getName();
     BigQueryOptions options = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+    // default value from options is 0, so we set at least one client
+    Integer numStreams =
+        options.getNumStorageWriteApiStreams() == 0 ? 1 : 
options.getNumStorageWriteApiStreams();

Review Comment:
   create a new option that defaults to 1



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -86,15 +89,18 @@
   // (any access of the cache could trigger element expiration). Therefore 
most used of
   // APPEND_CLIENTS should
   // synchronize.
-  private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+  private static final Cache<String, List<StreamAppendClient>> APPEND_CLIENTS =

Review Comment:
   what did you find about the cost of synchronization? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -213,22 +229,38 @@ String createStreamIfNeeded() {
         return this.streamName;
       }
 
+      List<StreamAppendClient> generateClients() {
+        return IntStream.range(0, streamAppendClientCount)
+            .mapToObj(
+                i -> {
+                  try {
+                    StreamAppendClient client =
+                        datasetService.getStreamAppendClient(
+                            streamName, descriptorWrapper.descriptor);
+                    return client;
+                  } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                  }
+                })
+            .collect(Collectors.toList());
+      }
+
       StreamAppendClient getStreamAppendClient(boolean lookupCache) {
         try {
           if (streamAppendClient == null) {
             createStreamIfNeeded();
             synchronized (APPEND_CLIENTS) {
               if (lookupCache) {
                 this.streamAppendClient =
-                    APPEND_CLIENTS.get(
-                        streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                streamName, descriptorWrapper.descriptor));
+                    APPEND_CLIENTS.get(streamName, () -> 
generateClients()).get(clientNumber);

Review Comment:
   Instead of generating all clients eagerly, let's do it lazily. Initialize a 
List with count copies of Optional.empty(). Then do 
      this.streamAppendCient = APPEND_CLIENTS.get(streamName, 
this.generateClients).get(clientNumber).get().orElseGet(this.getStreamAppendClient).
   
   FYI you could also do this with null if you don't care to use Optional here.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -357,28 +389,33 @@ void flush(RetryManager<AppendRowsResponse, 
Context<AppendRowsResponse>> retryMa
     private transient @Nullable DatasetService datasetService;
     private int numPendingRecords = 0;
     private int numPendingRecordBytes = 0;
-    private static final int FLUSH_THRESHOLD_RECORDS = 150000;
     private final int flushThresholdBytes;
+    private final int flushThresholdCount;
     private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
     private final BigQueryServices bqServices;
     private final boolean useDefaultStream;
+    // default append client count to 1
+    private Integer streamAppendClientCount = 1;

Review Comment:
   why not private int?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -263,9 +295,9 @@ void invalidateWriteStream() {
             // thread has already invalidated
             // and recreated the stream).
             @Nullable
-            StreamAppendClient cachedAppendClient = 
APPEND_CLIENTS.getIfPresent(streamName);
-            if (cachedAppendClient != null
-                && System.identityHashCode(cachedAppendClient)
+            List<StreamAppendClient> cachedAppendClients = 
APPEND_CLIENTS.getIfPresent(streamName);
+            if (cachedAppendClients != null
+                && 
System.identityHashCode(cachedAppendClients.get(clientNumber))
                     == System.identityHashCode(streamAppendClient)) {

Review Comment:
   This isn't quite right - we're now invalidating all of the StreamWriters 
when any one of them fails. I think instead you want to just null out the one 
that failed and allow it to be recreated the next get.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to