This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 18438bbab50 [improve][io] Add configuration parameter for disabling
aggregation for Kinesis Producers (#24289)
18438bbab50 is described below
commit 18438bbab505ded332daf856b78b2ed84aaff51c
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed May 14 23:20:43 2025 +0530
[improve][io] Add configuration parameter for disabling aggregation for
Kinesis Producers (#24289)
---
.../src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 1 +
.../main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java | 6 ++++++
2 files changed, 7 insertions(+)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 1db63c90b27..b675cecb2b8 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -185,6 +185,7 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
.getCredentialProvider();
kinesisConfig.setCredentialsProvider(credentialsProvider);
kinesisConfig.setNativeExecutable(StringUtils.trimToEmpty(kinesisSinkConfig.getNativeExecutable()));
+
kinesisConfig.setAggregationEnabled(kinesisSinkConfig.isAggregationEnabled());
this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
this.kinesisProducer = new KinesisProducer(kinesisConfig);
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index a3b87d4886a..7f27fe33055 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -172,4 +172,10 @@ public class KinesisSinkConfig extends BaseKinesisConfig
implements Serializable
help = "Custom AWS STS port to connect to"
)
private Integer awsStsPort;
+
+ @FieldDoc(
+ defaultValue = "true",
+ help = "Enable aggregation. With aggregation, multiple user
records could be packed into a single\n"
+ + " KinesisRecord. If disabled, each user record is sent
in its own KinesisRecord.")
+ private boolean aggregationEnabled = true;
}