This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 80902f811da [improve][io] Add support for the complete 
KinesisProducerConfiguration in KinesisSinkConfig (#24489)
80902f811da is described below

commit 80902f811da439b104f878c28f09d6d63660fd4c
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed Jul 23 03:09:19 2025 +0530

    [improve][io] Add support for the complete KinesisProducerConfiguration in 
KinesisSinkConfig (#24489)
---
 pulsar-io/kinesis/pom.xml                          |  7 ++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 20 ++++--
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 76 ++++++++++++++++++++++
 .../pulsar/io/kinesis/KinesisSinkConfigTest.java   | 11 ++++
 4 files changed, 110 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index e2aabb3f3ca..c8179de26c9 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -108,6 +108,13 @@
       <artifactId>aws-java-sdk-core</artifactId>
     </dependency>
 
+    <!-- add utils to fix jackson parsing issue -->
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>utils</artifactId>
+      <version>2.22.12</version>
+    </dependency>
+
     <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-client</artifactId>
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index b675cecb2b8..76686fbd899 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -150,12 +151,13 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
     }
 
     @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws IOException {
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
         kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
         this.sinkContext = sinkContext;
 
-        KinesisProducerConfiguration kinesisConfig = new 
KinesisProducerConfiguration();
+        KinesisProducerConfiguration kinesisConfig = KinesisSinkConfig
+                
.loadExtraKinesisProducerConfig(kinesisSinkConfig.getExtraKinesisProducerConfig());
         if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
             
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
         }
@@ -173,8 +175,18 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
         }
         kinesisConfig.setRegion(kinesisSinkConfig.getAwsRegion());
         kinesisConfig.setThreadingModel(ThreadingModel.POOLED);
-        kinesisConfig.setThreadPoolSize(4);
-        kinesisConfig.setCollectionMaxCount(1);
+        if (kinesisConfig.getThreadPoolSize() == 0) {
+            kinesisConfig.setThreadPoolSize(4);
+        }
+        
kinesisConfig.setCollectionMaxCount(kinesisSinkConfig.getCollectionMaxCount());
+        
kinesisConfig.setCollectionMaxSize(kinesisSinkConfig.getCollectionMaxSize());
+        kinesisConfig.setConnectTimeout(kinesisSinkConfig.getConnectTimeout());
+        
kinesisConfig.setCredentialsRefreshDelay(kinesisSinkConfig.getCredentialsRefreshDelay());
+        kinesisConfig.setMaxConnections(kinesisSinkConfig.getMaxConnections());
+        kinesisConfig.setMinConnections(kinesisSinkConfig.getMinConnections());
+        kinesisConfig.setRateLimit(kinesisSinkConfig.getRateLimit());
+        kinesisConfig.setRecordTtl(kinesisSinkConfig.getRecordTtl());
+        kinesisConfig.setRequestTimeout(kinesisSinkConfig.getRequestTimeout());
         if (kinesisSinkConfig.getSkipCertificateValidation() != null
                 && kinesisSinkConfig.getSkipCertificateValidation()) {
             kinesisConfig.setVerifyCertificate(false);
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index 7f27fe33055..9d316e54419 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.io.kinesis;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -178,4 +182,76 @@ public class KinesisSinkConfig extends BaseKinesisConfig 
implements Serializable
             help = "Enable aggregation. With aggregation, multiple user 
records could be packed into a single\n"
                     + " KinesisRecord. If disabled, each user record is sent 
in its own KinesisRecord.")
     private boolean aggregationEnabled = true;
+
+    @FieldDoc(
+            defaultValue = "500",
+            help = "Maximum number of items to pack into an PutRecords 
request.")
+    private long collectionMaxCount = 500L;
+
+    @FieldDoc(
+            defaultValue = "5242880",
+            help = "Maximum amount of data to send with a PutRecords request. 
Records larger than the limit will\n"
+                    + "still be sent, but will not be grouped with others.")
+    private long collectionMaxSize = 5242880L;
+
+    @FieldDoc(
+            defaultValue = "6000",
+            help = "Timeout (milliseconds) for establishing TLS connections.")
+    private long connectTimeout = 6000L;
+
+    @FieldDoc(
+            defaultValue = "5000",
+            help = "How often to refresh credentials (in milliseconds). During 
a refresh, credentials are retrieved\n"
+                    + "from any SDK credentials providers attached to the 
wrapper and pushed to the core.")
+    private long credentialsRefreshDelay = 5000L;
+
+    @FieldDoc(
+            defaultValue = "24",
+            help = "Maximum number of connections to open to the backend. HTTP 
requests are sent in parallel\n"
+                    + "over multiple connections.")
+    private long maxConnections = 24L;
+
+    @FieldDoc(
+            defaultValue = "1",
+            help = "Minimum number of connections to keep open to the 
backend.")
+    private long minConnections = 1L;
+
+    @FieldDoc(
+            defaultValue = "150",
+            help = """
+                    Limits the maximum allowed put rate for a shard, as a 
percentage of the backend limits. The
+                    default value of 150% is chosen to allow a single producer 
instance to completely saturate the
+                    allowance for a shard. This is an aggressive setting. If 
you prefer to reduce throttling
+                    errors rather than completely saturate the shard, consider 
reducing this setting.""")
+    private long rateLimit = 150L;
+
+    @FieldDoc(
+            defaultValue = "30000",
+            help = """
+                    Set a time-to-live on records (milliseconds). Records that 
do not get successfully put within the
+                    limit are failed and retried by KinesisSink. This should 
be set lower than the Pulsar source's
+                    timeoutMs to minimize the risk of duplicate records and to 
control heap memory usage in the Kinesis
+                    sink, especially during re-deliveries.""")
+    private long recordTtl = 30000L;
+
+    @FieldDoc(
+            defaultValue = "6000",
+            help = """
+                    The maximum total time (milliseconds) elapsed between when 
we begin a HTTP request and receiving
+                    all of the response. If it goes over, the request will be 
timed-out. Note that a timed-out
+                    request may actually succeed at the backend. Retrying then 
leads to duplicates. Setting the
+                    timeout too low will therefore increase the probability of 
duplicates.""")
+    private long requestTimeout = 6000L;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "Extra KinesisProducerConfiguration parameters. See 
https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.15.2/index.html?com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html
 for all the available parameters."
+                    + "Parameters that are explicitly set take preference over 
extra config.")
+    private Map<String, String> extraKinesisProducerConfig = new HashMap<>();
+
+    public static KinesisProducerConfiguration 
loadExtraKinesisProducerConfig(Map<String, String> map)
+            throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(mapper.writeValueAsString(map), 
KinesisProducerConfiguration.class);
+    }
 }
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
index cbeb350d76b..1c6beee96b8 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kinesis;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,6 +38,11 @@ public class KinesisSinkConfigTest {
         map.put("awsKinesisStreamName", "my-stream");
         map.put("awsCredentialPluginParam", 
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
 
+        Map<String, String> expectedExtraKinesisProducerConfig = new HashMap<> 
();
+        expectedExtraKinesisProducerConfig.put("credentialsRefreshDelay", 
"6000");
+        expectedExtraKinesisProducerConfig.put("logLevel", "debug");
+        map.put("extraKinesisProducerConfig", 
expectedExtraKinesisProducerConfig);
+
         SinkContext sinkContext = Mockito.mock(SinkContext.class);
         KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
 
@@ -46,6 +52,11 @@ public class KinesisSinkConfigTest {
         assertEquals(config.getAwsKinesisStreamName(), "my-stream");
         assertEquals(config.getAwsCredentialPluginParam(),
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        assertEquals(config.getExtraKinesisProducerConfig(), 
expectedExtraKinesisProducerConfig,
+                "ExtraKinesisProducerConfiguration Maps should match exactly");
+        KinesisProducerConfiguration kinesisProducerConfiguration = 
KinesisSinkConfig
+                
.loadExtraKinesisProducerConfig(config.getExtraKinesisProducerConfig());
+        
assertEquals(kinesisProducerConfiguration.getCredentialsRefreshDelay(), 6000);
     }
 
     @Test

Reply via email to