This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 80902f811da [improve][io] Add support for the complete
KinesisProducerConfiguration in KinesisSinkConfig (#24489)
80902f811da is described below
commit 80902f811da439b104f878c28f09d6d63660fd4c
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed Jul 23 03:09:19 2025 +0530
[improve][io] Add support for the complete KinesisProducerConfiguration in
KinesisSinkConfig (#24489)
---
pulsar-io/kinesis/pom.xml | 7 ++
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 20 ++++--
.../pulsar/io/kinesis/KinesisSinkConfig.java | 76 ++++++++++++++++++++++
.../pulsar/io/kinesis/KinesisSinkConfigTest.java | 11 ++++
4 files changed, 110 insertions(+), 4 deletions(-)
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index e2aabb3f3ca..c8179de26c9 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -108,6 +108,13 @@
<artifactId>aws-java-sdk-core</artifactId>
</dependency>
+ <!-- add utils to fix jackson parsing issue -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>utils</artifactId>
+ <version>2.22.12</version>
+ </dependency>
+
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index b675cecb2b8..76686fbd899 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -150,12 +151,13 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
}
@Override
- public void open(Map<String, Object> config, SinkContext sinkContext) {
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws IOException {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
this.sinkContext = sinkContext;
- KinesisProducerConfiguration kinesisConfig = new
KinesisProducerConfiguration();
+ KinesisProducerConfiguration kinesisConfig = KinesisSinkConfig
+
.loadExtraKinesisProducerConfig(kinesisSinkConfig.getExtraKinesisProducerConfig());
if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
}
@@ -173,8 +175,18 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
}
kinesisConfig.setRegion(kinesisSinkConfig.getAwsRegion());
kinesisConfig.setThreadingModel(ThreadingModel.POOLED);
- kinesisConfig.setThreadPoolSize(4);
- kinesisConfig.setCollectionMaxCount(1);
+ if (kinesisConfig.getThreadPoolSize() == 0) {
+ kinesisConfig.setThreadPoolSize(4);
+ }
+
kinesisConfig.setCollectionMaxCount(kinesisSinkConfig.getCollectionMaxCount());
+
kinesisConfig.setCollectionMaxSize(kinesisSinkConfig.getCollectionMaxSize());
+ kinesisConfig.setConnectTimeout(kinesisSinkConfig.getConnectTimeout());
+
kinesisConfig.setCredentialsRefreshDelay(kinesisSinkConfig.getCredentialsRefreshDelay());
+ kinesisConfig.setMaxConnections(kinesisSinkConfig.getMaxConnections());
+ kinesisConfig.setMinConnections(kinesisSinkConfig.getMinConnections());
+ kinesisConfig.setRateLimit(kinesisSinkConfig.getRateLimit());
+ kinesisConfig.setRecordTtl(kinesisSinkConfig.getRecordTtl());
+ kinesisConfig.setRequestTimeout(kinesisSinkConfig.getRequestTimeout());
if (kinesisSinkConfig.getSkipCertificateValidation() != null
&& kinesisSinkConfig.getSkipCertificateValidation()) {
kinesisConfig.setVerifyCertificate(false);
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index 7f27fe33055..9d316e54419 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.io.kinesis;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -178,4 +182,76 @@ public class KinesisSinkConfig extends BaseKinesisConfig
implements Serializable
help = "Enable aggregation. With aggregation, multiple user
records could be packed into a single\n"
+ " KinesisRecord. If disabled, each user record is sent
in its own KinesisRecord.")
private boolean aggregationEnabled = true;
+
+ @FieldDoc(
+ defaultValue = "500",
+ help = "Maximum number of items to pack into an PutRecords
request.")
+ private long collectionMaxCount = 500L;
+
+ @FieldDoc(
+ defaultValue = "5242880",
+ help = "Maximum amount of data to send with a PutRecords request.
Records larger than the limit will\n"
+ + "still be sent, but will not be grouped with others.")
+ private long collectionMaxSize = 5242880L;
+
+ @FieldDoc(
+ defaultValue = "6000",
+ help = "Timeout (milliseconds) for establishing TLS connections.")
+ private long connectTimeout = 6000L;
+
+ @FieldDoc(
+ defaultValue = "5000",
+ help = "How often to refresh credentials (in milliseconds). During
a refresh, credentials are retrieved\n"
+ + "from any SDK credentials providers attached to the
wrapper and pushed to the core.")
+ private long credentialsRefreshDelay = 5000L;
+
+ @FieldDoc(
+ defaultValue = "24",
+ help = "Maximum number of connections to open to the backend. HTTP
requests are sent in parallel\n"
+ + "over multiple connections.")
+ private long maxConnections = 24L;
+
+ @FieldDoc(
+ defaultValue = "1",
+ help = "Minimum number of connections to keep open to the
backend.")
+ private long minConnections = 1L;
+
+ @FieldDoc(
+ defaultValue = "150",
+ help = """
+ Limits the maximum allowed put rate for a shard, as a
percentage of the backend limits. The
+ default value of 150% is chosen to allow a single producer
instance to completely saturate the
+ allowance for a shard. This is an aggressive setting. If
you prefer to reduce throttling
+ errors rather than completely saturate the shard, consider
reducing this setting.""")
+ private long rateLimit = 150L;
+
+ @FieldDoc(
+ defaultValue = "30000",
+ help = """
+ Set a time-to-live on records (milliseconds). Records that
do not get successfully put within the
+ limit are failed and retried by KinesisSink. This should
be set lower than the Pulsar source's
+ timeoutMs to minimize the risk of duplicate records and to
control heap memory usage in the Kinesis
+ sink, especially during re-deliveries.""")
+ private long recordTtl = 30000L;
+
+ @FieldDoc(
+ defaultValue = "6000",
+ help = """
+ The maximum total time (milliseconds) elapsed between when
we begin a HTTP request and receiving
+ all of the response. If it goes over, the request will be
timed-out. Note that a timed-out
+ request may actually succeed at the backend. Retrying then
leads to duplicates. Setting the
+ timeout too low will therefore increase the probability of
duplicates.""")
+ private long requestTimeout = 6000L;
+
+ @FieldDoc(
+ defaultValue = "",
+ help = "Extra KinesisProducerConfiguration parameters. See
https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.15.2/index.html?com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html
for all the available parameters."
+ + "Parameters that are explicitly set take preference over
extra config.")
+ private Map<String, String> extraKinesisProducerConfig = new HashMap<>();
+
+ public static KinesisProducerConfiguration
loadExtraKinesisProducerConfig(Map<String, String> map)
+ throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(mapper.writeValueAsString(map),
KinesisProducerConfiguration.class);
+ }
}
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
index cbeb350d76b..1c6beee96b8 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kinesis;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -37,6 +38,11 @@ public class KinesisSinkConfigTest {
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam",
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ Map<String, String> expectedExtraKinesisProducerConfig = new HashMap<>
();
+ expectedExtraKinesisProducerConfig.put("credentialsRefreshDelay",
"6000");
+ expectedExtraKinesisProducerConfig.put("logLevel", "debug");
+ map.put("extraKinesisProducerConfig",
expectedExtraKinesisProducerConfig);
+
SinkContext sinkContext = Mockito.mock(SinkContext.class);
KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
@@ -46,6 +52,11 @@ public class KinesisSinkConfigTest {
assertEquals(config.getAwsKinesisStreamName(), "my-stream");
assertEquals(config.getAwsCredentialPluginParam(),
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ assertEquals(config.getExtraKinesisProducerConfig(),
expectedExtraKinesisProducerConfig,
+ "ExtraKinesisProducerConfiguration Maps should match exactly");
+ KinesisProducerConfiguration kinesisProducerConfiguration =
KinesisSinkConfig
+
.loadExtraKinesisProducerConfig(config.getExtraKinesisProducerConfig());
+
assertEquals(kinesisProducerConfiguration.getCredentialsRefreshDelay(), 6000);
}
@Test