dannycranmer commented on a change in pull request #18733:
URL: https://github.com/apache/flink/pull/18733#discussion_r806704643



##########
File path: 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
##########
@@ -273,7 +274,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
 
     public static SdkAsyncHttpClient createAsyncHttpClient(
             final AttributeMap config, final NettyNioAsyncHttpClient.Builder 
httpClientBuilder) {
+        return createAsyncHttpClient(config, httpClientBuilder, 
SdkEventLoopGroup.builder());

Review comment:
       The legacy connector based on KPL is not using async AWS SDK v2 client. 
Current (non FLIP-27) source is using async clients for EFO. This source 
creates [one client per sub task 
too](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L481),
 so this change will result in a larger number of threads being created and 
could cause a regression.
   
   Since the Kinesis EFO source is currently using `2 x num cpus` I agree that 
`N x 2 x num cpus` is  big change.
   
   > It feels that some fairly low number should do
   
   I agree.  How about we pick 2 threads? This would follow the formula to 1 
parallelism per CPU.
   
   The other option is to try to calculate it dynamically to scale above 2 for 
bigger system, for instance:
   
   ```
   thread_pool_size = max(2, (num_cpus / sink_parallelism) * 2)
   
   # 2 CPUs at 8 parallelism (under provisioned)
   thread_pool_size = max(2, (2 / 8) * 2) = 2
   
   # 4 CPUs at 4 parallelism (target provisioning)
   thread_pool_size = max(2, (4 / 4) * 2) = 2
   
   # 32 CPUs at 4 parallelism (over provisioned)
   thread_pool_size = max(2, (32 / 4) * 2) = 16
   ```
   
   Maximum thread pool size here would be `2 x num cpus`. But does not scale 
for apps with many sinks, or clusters with many jobs.
   
   > we should allow overrides from the user
   
   I agree
   

##########
File path: 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
##########
@@ -273,7 +274,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
 
     public static SdkAsyncHttpClient createAsyncHttpClient(
             final AttributeMap config, final NettyNioAsyncHttpClient.Builder 
httpClientBuilder) {
+        return createAsyncHttpClient(config, httpClientBuilder, 
SdkEventLoopGroup.builder());

Review comment:
       The legacy connector based on KPL is not using async AWS SDK v2 client. 
Current (non FLIP-27) source is using async clients for EFO. This source 
creates [one client per sub task 
too](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L481),
 so this change will result in a larger number of threads being created and 
could cause a regression.
   
   Since the Kinesis EFO source is currently using `2 x num cpus` (assuming it 
is using the shared group) I agree that `N x 2 x num cpus` is  big change.
   
   > It feels that some fairly low number should do
   
   I agree.  How about we pick 2 threads? This would follow the formula to 1 
parallelism per CPU.
   
   The other option is to try to calculate it dynamically to scale above 2 for 
bigger system, for instance:
   
   ```
   thread_pool_size = max(2, (num_cpus / sink_parallelism) * 2)
   
   # 2 CPUs at 8 parallelism (under provisioned)
   thread_pool_size = max(2, (2 / 8) * 2) = 2
   
   # 4 CPUs at 4 parallelism (target provisioning)
   thread_pool_size = max(2, (4 / 4) * 2) = 2
   
   # 32 CPUs at 4 parallelism (over provisioned)
   thread_pool_size = max(2, (32 / 4) * 2) = 16
   ```
   
   Maximum thread pool size here would be `2 x num cpus`. But does not scale 
for apps with many sinks, or clusters with many jobs.
   
   > we should allow overrides from the user
   
   I agree
   

##########
File path: 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
##########
@@ -273,7 +274,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
 
     public static SdkAsyncHttpClient createAsyncHttpClient(
             final AttributeMap config, final NettyNioAsyncHttpClient.Builder 
httpClientBuilder) {
+        return createAsyncHttpClient(config, httpClientBuilder, 
SdkEventLoopGroup.builder());

Review comment:
       The legacy connector based on KPL is not using async AWS SDK v2 client. 
Current (non FLIP-27) source is using async clients for EFO. This source 
creates [one client per sub task 
too](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L481),
 so this change will result in a larger number of threads being created and 
could cause a regression.
   
   Since the Kinesis EFO source is currently using `2 x num cpus` (assuming it 
is using the shared group) I agree that `N x 2 x num cpus` is  big change.
   
   > It feels that some fairly low number should do
   
   I agree.  How about we pick 2 threads? This would follow the formula when we 
have 1 parallelism per CPU.
   
   The other option is to try to calculate it dynamically to scale above 2 for 
bigger system, for instance:
   
   ```
   thread_pool_size = max(2, (num_cpus / sink_parallelism) * 2)
   
   # 2 CPUs at 8 parallelism (under provisioned)
   thread_pool_size = max(2, (2 / 8) * 2) = 2
   
   # 4 CPUs at 4 parallelism (target provisioning)
   thread_pool_size = max(2, (4 / 4) * 2) = 2
   
   # 32 CPUs at 4 parallelism (over provisioned)
   thread_pool_size = max(2, (32 / 4) * 2) = 16
   ```
   
   Maximum thread pool size here would be `2 x num cpus`. But does not scale 
for apps with many sinks, or clusters with many jobs.
   
   > we should allow overrides from the user
   
   I agree
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to