This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1aaa48427ea [improve][io] Add configuration parameter for disabling 
aggregation for Kinesis Producers (#24289)
1aaa48427ea is described below

commit 1aaa48427ea5e671171bcabb390767b43aa0d576
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed May 14 23:20:43 2025 +0530

    [improve][io] Add configuration parameter for disabling aggregation for 
Kinesis Producers (#24289)
    
    (cherry picked from commit 18438bbab505ded332daf856b78b2ed84aaff51c)
---
 .../src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java     | 1 +
 .../main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java   | 6 ++++++
 2 files changed, 7 insertions(+)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index fb8eedff82f..34a360c723a 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -182,6 +182,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
                 kinesisSinkConfig.getAwsCredentialPluginParam())
             .getCredentialProvider();
         kinesisConfig.setCredentialsProvider(credentialsProvider);
+        
kinesisConfig.setAggregationEnabled(kinesisSinkConfig.isAggregationEnabled());
 
         this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
         this.kinesisProducer = new KinesisProducer(kinesisConfig);
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index c5b26a26d0c..eb08496dbcd 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -145,4 +145,10 @@ public class KinesisSinkConfig extends BaseKinesisConfig 
implements Serializable
         FULL_MESSAGE_IN_JSON_EXPAND_VALUE
     }
 
+
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Enable aggregation. With aggregation, multiple user 
records could be packed into a single\n"
+                    + " KinesisRecord. If disabled, each user record is sent 
in its own KinesisRecord.")
+    private boolean aggregationEnabled = true;
 }

Reply via email to