jackdingilian commented on code in PR #26355:
URL: https://github.com/apache/beam/pull/26355#discussion_r1189160798


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java:
##########
@@ -100,9 +101,30 @@ private static BigtableChangeStreamAccessor 
createAccessor(@NonNull BigtableConf
     dataSettingsBuilder.setInstanceId(instanceId);
     tableAdminSettingsBuilder.setInstanceId(instanceId);
 
-    if (appProfileId != null) {
-      dataSettingsBuilder.setAppProfileId(appProfileId);
-    }
+    String appProfileId = 
checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();
+    dataSettingsBuilder.setAppProfileId(appProfileId);
+
+    dataSettingsBuilder
+        .stubSettings()
+        .setTransportChannelProvider(
+            EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder()
+                .setAttemptDirectPath(false) // Disable DirectPath
+                .setChannelPoolSettings( // Autoscale Channel Size
+                    ChannelPoolSettings.builder()
+                        // Make sure that there are at least 2 channels 
regardless of RPCs
+                        .setMinChannelCount(2)
+                        // Limit number of channels to 100 regardless of QPS
+                        .setMaxChannelCount(100)
+                        // Start off with 5
+                        .setInitialChannelCount(5)
+                        // Make sure the channels are primed before use
+                        .setPreemptiveRefreshEnabled(true)
+                        // evict channels when there are less than 10 
outstanding RPCs
+                        .setMinRpcsPerChannel(10)
+                        // add more channels when the channel has 50 
outstanding RPCs
+                        .setMaxRpcsPerChannel(50)
+                        .build())

Review Comment:
   The channel pool settings are somewhat arbitrary. The main goal is to 
increase the number of concurrent streams per worker and to use dynamic channel 
pool so that it will resize automatically. We know that we expect a max of 
dataflow `workerHarnessThreads` open streams per worker (bc we use the blocking 
stream APIs) and that defaults to 500. Before this change, the channel pool 
size is 2, and there is a max of 100 concurrent streams per channel so we are 
limiting the number of active streams to 200 effectively.  Added a comment.
   
   The 10 attempts below is to match the default for other bigtable rpcs. We 
noticed that these were occasionally throwing exceptions for transient failures 
that should be retried while other methods weren't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to