jackdingilian commented on code in PR #26355:
URL: https://github.com/apache/beam/pull/26355#discussion_r1189160458


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java:
##########
@@ -100,9 +101,30 @@ private static BigtableChangeStreamAccessor 
createAccessor(@NonNull BigtableConf
     dataSettingsBuilder.setInstanceId(instanceId);
     tableAdminSettingsBuilder.setInstanceId(instanceId);
 
-    if (appProfileId != null) {
-      dataSettingsBuilder.setAppProfileId(appProfileId);
-    }
+    String appProfileId = 
checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();

Review Comment:
   It is currently mandatory, we already had the 
`checkArgumentNotNull(bigtableConfig.getAppProfileId())` above. We are actually 
going to update this to default to the default app profile in a follow on 
though. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java:
##########
@@ -160,6 +167,22 @@ public ProcessContinuation run(
     // Process CloseStream if it exists
     CloseStream closeStream = tracker.currentRestriction().getCloseStream();
     if (closeStream != null) {
+      // tracker.currentRestriction().closeStream.getStatus()

Review Comment:
   Yes. Removed, thanks



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java:
##########
@@ -100,9 +101,30 @@ private static BigtableChangeStreamAccessor 
createAccessor(@NonNull BigtableConf
     dataSettingsBuilder.setInstanceId(instanceId);
     tableAdminSettingsBuilder.setInstanceId(instanceId);
 
-    if (appProfileId != null) {
-      dataSettingsBuilder.setAppProfileId(appProfileId);
-    }
+    String appProfileId = 
checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();
+    dataSettingsBuilder.setAppProfileId(appProfileId);
+
+    dataSettingsBuilder
+        .stubSettings()
+        .setTransportChannelProvider(
+            EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder()
+                .setAttemptDirectPath(false) // Disable DirectPath
+                .setChannelPoolSettings( // Autoscale Channel Size
+                    ChannelPoolSettings.builder()
+                        // Make sure that there are at least 2 channels 
regardless of RPCs
+                        .setMinChannelCount(2)
+                        // Limit number of channels to 100 regardless of QPS
+                        .setMaxChannelCount(100)
+                        // Start off with 5
+                        .setInitialChannelCount(5)
+                        // Make sure the channels are primed before use
+                        .setPreemptiveRefreshEnabled(true)
+                        // evict channels when there are less than 10 
outstanding RPCs
+                        .setMinRpcsPerChannel(10)
+                        // add more channels when the channel has 50 
outstanding RPCs
+                        .setMaxRpcsPerChannel(50)
+                        .build())

Review Comment:
   The channel pool settings are somewhat arbitrary. The main goal is to 
increase the number of concurrent streams per worker. We know that we expect a 
max of dataflow `workerHarnessThreads` open streams per worker (bc we use the 
blocking stream APIs) and that defaults to 500. Before this change, the channel 
pool size is 2, and there is a max of 100 concurrent streams per channel so we 
are limiting the number of active streams to 200 effectively.  Added a comment.
   
   The 10 attempts below is to match the default for other bigtable rpcs. We 
noticed that these were occasionally throwing exceptions for transient failures 
that should be retried while other methods weren't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to