This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ceef2c1967d4cb6f2bad9a82fce5e00c132eac4c Author: Malla Sandeep <[email protected]> AuthorDate: Wed May 14 23:20:43 2025 +0530 [improve][io] Add configuration parameter for disabling aggregation for Kinesis Producers (#24289) (cherry picked from commit 18438bbab505ded332daf856b78b2ed84aaff51c) --- .../src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 1 + .../main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 1db63c90b27..b675cecb2b8 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -185,6 +185,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<GenericObj .getCredentialProvider(); kinesisConfig.setCredentialsProvider(credentialsProvider); kinesisConfig.setNativeExecutable(StringUtils.trimToEmpty(kinesisSinkConfig.getNativeExecutable())); + kinesisConfig.setAggregationEnabled(kinesisSinkConfig.isAggregationEnabled()); this.streamName = kinesisSinkConfig.getAwsKinesisStreamName(); this.kinesisProducer = new KinesisProducer(kinesisConfig); diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index a3b87d4886a..7f27fe33055 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -172,4 +172,10 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable help = "Custom AWS STS port to connect to" ) private Integer awsStsPort; + + @FieldDoc( + defaultValue = "true", + help = "Enable aggregation. With aggregation, multiple user records could be packed into a single\n" + + " KinesisRecord. If disabled, each user record is sent in its own KinesisRecord.") + private boolean aggregationEnabled = true; }
