This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new b0830cedb3f [FLINK-29205][connectors/kinesis] Passthrough use config 
to HTTP client when constructing Async Client for Kinesis EFO
b0830cedb3f is described below

commit b0830cedb3f0d30a16fc2a312bc7beb6900b2cd9
Author: Danny Cranmer <dannycran...@apache.org>
AuthorDate: Tue Sep 6 13:14:39 2022 +0100

    [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client 
when constructing Async Client for Kinesis EFO
---
 .../connectors/kinesis/proxy/KinesisProxyV2Factory.java     | 10 ++++------
 .../connectors/kinesis/util/KinesisConfigUtil.java          | 13 +++++++++++++
 .../connectors/kinesis/util/KinesisConfigUtilTest.java      | 11 +++++++++++
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index f8cf6371519..24c3cb3b6d6 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
 import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -61,15 +62,12 @@ public class KinesisProxyV2Factory {
         final FanOutRecordPublisherConfiguration configuration =
                 new FanOutRecordPublisherConfiguration(configProps, 
emptyList());
 
-        Properties legacyConfigProps = new Properties(configProps);
-        legacyConfigProps.setProperty(
-                KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
-                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
-                        
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+        Properties asyncClientProperties =
+                
KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
 
         final KinesisAsyncClient client =
                 AWSAsyncSinkUtil.createAwsAsyncClient(
-                        legacyConfigProps,
+                        asyncClientProperties,
                         httpClient,
                         KinesisAsyncClient.builder(),
                         
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9f6965a7957..dbcfcfef4a7 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -18,7 +18,9 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
@@ -537,6 +539,17 @@ public class KinesisConfigUtil {
         }
     }
 
+    public static Properties getV2ConsumerAsyncClientProperties(final 
Properties configProps) {
+        Properties asyncClientProperties = new Properties();
+        asyncClientProperties.putAll(configProps);
+        asyncClientProperties.setProperty(
+                KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+
+        return asyncClientProperties;
+    }
+
     private static void validateOptionalPositiveLongProperty(
             Properties config, String key, String message) {
         if (config.containsKey(key)) {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index c9811d7481a..5eed00a4dce 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -1010,4 +1010,15 @@ public class KinesisConfigUtilTest {
 
         KinesisConfigUtil.validateConsumerConfiguration(testConfig);
     }
+
+    @Test
+    public void testGetV2ConsumerAsyncClientProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("retained", "property");
+
+        
assertThat(KinesisConfigUtil.getV2ConsumerAsyncClientProperties(properties))
+                .containsEntry("retained", "property")
+                .containsKey("aws.kinesis.client.user-agent-prefix")
+                .hasSize(2);
+    }
 }

Reply via email to