This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new b0830cedb3f [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO b0830cedb3f is described below commit b0830cedb3f0d30a16fc2a312bc7beb6900b2cd9 Author: Danny Cranmer <dannycran...@apache.org> AuthorDate: Tue Sep 6 13:14:39 2022 +0100 [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO --- .../connectors/kinesis/proxy/KinesisProxyV2Factory.java | 10 ++++------ .../connectors/kinesis/util/KinesisConfigUtil.java | 13 +++++++++++++ .../connectors/kinesis/util/KinesisConfigUtilTest.java | 11 +++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java index f8cf6371519..24c3cb3b6d6 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration; import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.Preconditions; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -61,15 +62,12 @@ public class KinesisProxyV2Factory { final FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(configProps, emptyList()); - Properties legacyConfigProps = new Properties(configProps); - legacyConfigProps.setProperty( - KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, - AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( - KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + Properties asyncClientProperties = + KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps); final KinesisAsyncClient client = AWSAsyncSinkUtil.createAwsAsyncClient( - legacyConfigProps, + asyncClientProperties, httpClient, KinesisAsyncClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 9f6965a7957..dbcfcfef4a7 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -18,7 +18,9 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; @@ -537,6 +539,17 @@ public class KinesisConfigUtil { } } + public static Properties getV2ConsumerAsyncClientProperties(final Properties configProps) { + Properties asyncClientProperties = new Properties(); + asyncClientProperties.putAll(configProps); + asyncClientProperties.setProperty( + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, + AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + + return asyncClientProperties; + } + private static void validateOptionalPositiveLongProperty( Properties config, String key, String message) { if (config.containsKey(key)) { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index c9811d7481a..5eed00a4dce 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -1010,4 +1010,15 @@ public class KinesisConfigUtilTest { KinesisConfigUtil.validateConsumerConfiguration(testConfig); } + + @Test + public void testGetV2ConsumerAsyncClientProperties() { + Properties properties = new Properties(); + properties.setProperty("retained", "property"); + + assertThat(KinesisConfigUtil.getV2ConsumerAsyncClientProperties(properties)) + .containsEntry("retained", "property") + .containsKey("aws.kinesis.client.user-agent-prefix") + .hasSize(2); + } }