This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 18438bbab50 [improve][io] Add configuration parameter for disabling 
aggregation for Kinesis Producers (#24289)
18438bbab50 is described below

commit 18438bbab505ded332daf856b78b2ed84aaff51c
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed May 14 23:20:43 2025 +0530

    [improve][io] Add configuration parameter for disabling aggregation for 
Kinesis Producers (#24289)
---
 .../src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java     | 1 +
 .../main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java   | 6 ++++++
 2 files changed, 7 insertions(+)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 1db63c90b27..b675cecb2b8 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -185,6 +185,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
             .getCredentialProvider();
         kinesisConfig.setCredentialsProvider(credentialsProvider);
         
kinesisConfig.setNativeExecutable(StringUtils.trimToEmpty(kinesisSinkConfig.getNativeExecutable()));
+        
kinesisConfig.setAggregationEnabled(kinesisSinkConfig.isAggregationEnabled());
 
         this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
         this.kinesisProducer = new KinesisProducer(kinesisConfig);
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index a3b87d4886a..7f27fe33055 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -172,4 +172,10 @@ public class KinesisSinkConfig extends BaseKinesisConfig 
implements Serializable
             help = "Custom AWS STS port to connect to"
     )
     private Integer awsStsPort;
+
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Enable aggregation. With aggregation, multiple user 
records could be packed into a single\n"
+                    + " KinesisRecord. If disabled, each user record is sent 
in its own KinesisRecord.")
+    private boolean aggregationEnabled = true;
 }

Reply via email to