This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b52830b560f CAMEL-20794: AWS2 Kinesis producer supports sending batch (#14618) b52830b560f is described below commit b52830b560fa64e7c9188f4105837f87d6aaa3f4 Author: Fan Yang <fan...@hotmail.com> AuthorDate: Mon Jun 24 17:17:56 2024 +0800 CAMEL-20794: AWS2 Kinesis producer supports sending batch (#14618) * Kinesis producer supports sending batch * Format code * Set max batch size 500 --- .../component/aws2/kinesis/Kinesis2Producer.java | 96 +++++++++++++++++++--- .../kinesis/integration/KinesisProducerIT.java | 13 ++- 2 files changed, 97 insertions(+), 12 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index ce8903aa4ac..c51fe777379 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -16,6 +16,11 @@ */ package org.apache.camel.component.aws2.kinesis; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; @@ -23,9 +28,15 @@ import org.apache.camel.util.ObjectHelper; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; public class Kinesis2Producer extends DefaultProducer { + // Maximum number of records that can be sent in a single PutRecords request + private static final int MAX_BATCH_SIZE = 500; + private KinesisConnection connection; public Kinesis2Producer(Kinesis2Endpoint endpoint) { @@ -45,11 +56,80 @@ public class Kinesis2Producer extends DefaultProducer { return (Kinesis2Endpoint) super.getEndpoint(); } + @Override + protected void doStart() throws Exception { + super.doStart(); + + ObjectHelper.notNull(connection, "connection", this); + } + @Override public void process(Exchange exchange) throws Exception { + Object body = exchange.getIn().getBody(); + if (body instanceof Iterable) { + sendBatchRecords(exchange); + } else { + sendSingleRecord(exchange); + } + } + + private void sendBatchRecords(Exchange exchange) { + Object partitionKey = exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY); + ensurePartitionKeyNotNull(partitionKey); + List<List<PutRecordsRequestEntry>> requestBatchList = createRequestBatchList(exchange, partitionKey); + for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) { + PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder() + .streamName(getEndpoint().getConfiguration().getStreamName()) + .records(requestBatch) + .build(); + PutRecordsResponse putRecordsResponse = connection.getClient(getEndpoint()).putRecords(putRecordsRequest); + if (putRecordsResponse.failedRecordCount() > 0) { + throw new RuntimeException( + "Failed to send records " + putRecordsResponse.failedRecordCount() + " of " + + putRecordsResponse.records().size()); + } + } + } + + private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange exchange, Object partitionKey) { + List<List<PutRecordsRequestEntry>> requestBatchList = new ArrayList<>(); + List<PutRecordsRequestEntry> requestBatch = new ArrayList<>(MAX_BATCH_SIZE); + for (Object record : exchange.getIn().getBody(Iterable.class)) { + SdkBytes sdkBytes; + if (record instanceof byte[] bytes) { + sdkBytes = SdkBytes.fromByteArray(bytes); + } else if (record instanceof ByteBuffer bf) { + sdkBytes = SdkBytes.fromByteBuffer(bf); + } else if (record instanceof InputStream is) { + sdkBytes = SdkBytes.fromInputStream(is); + } else if (record instanceof String str) { + sdkBytes = SdkBytes.fromUtf8String(str); + } else { + throw new IllegalArgumentException( + "Record type not supported. Must be byte[], ByteBuffer, InputStream or UTF-8 String"); + } + + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder() + .data(sdkBytes) + .partitionKey(partitionKey.toString()) + .build(); + requestBatch.add(putRecordsRequestEntry); + if (requestBatch.size() == MAX_BATCH_SIZE) { + requestBatchList.add(requestBatch); + requestBatch = new ArrayList<>(MAX_BATCH_SIZE); + } + } + if (!requestBatch.isEmpty()) { + requestBatchList.add(requestBatch); + } + + return requestBatchList; + } + + private void sendSingleRecord(Exchange exchange) { PutRecordRequest request = createRequest(exchange); PutRecordResponse putRecordResult = connection.getClient(getEndpoint()).putRecord(request); - Message message = getMessageForResponse(exchange); + Message message = exchange.getMessage(); message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER, putRecordResult.sequenceNumber()); message.setHeader(Kinesis2Constants.SHARD_ID, putRecordResult.shardId()); } @@ -62,6 +142,7 @@ public class Kinesis2Producer extends DefaultProducer { PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder(); putRecordRequest.data(SdkBytes.fromByteArray(body)); putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName()); + ensurePartitionKeyNotNull(partitionKey); putRecordRequest.partitionKey(partitionKey.toString()); if (sequenceNumber != null) { putRecordRequest.sequenceNumberForOrdering(sequenceNumber.toString()); @@ -69,14 +150,9 @@ public class Kinesis2Producer extends DefaultProducer { return putRecordRequest.build(); } - public static Message getMessageForResponse(final Exchange exchange) { - return exchange.getMessage(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - - ObjectHelper.notNull(connection, "connection", this); + private void ensurePartitionKeyNotNull(Object partitionKey) { + if (partitionKey == null) { + throw new IllegalArgumentException("Partition key must be specified"); + } } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java index 4b43ff03f40..8b303017571 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws2.kinesis.integration; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -105,14 +106,22 @@ public class KinesisProducerIT extends CamelTestSupport { exchange.getIn().setBody("Kinesis Event 2."); }); - List<Record> records; + template.send("direct:start", ExchangePattern.InOut, exchange -> { + exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, "partition-1"); + exchange.getIn().setBody(Arrays.asList("Kinesis Event 3.", "Kinesis Event 4.".getBytes(StandardCharsets.UTF_8))); + }); + Awaitility.await().atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(2, consumeMessages())); + .untilAsserted(() -> assertEquals(4, consumeMessages())); assertEquals("Kinesis Event 1.", recordList.get(0).data().asString(StandardCharsets.UTF_8)); assertEquals("partition-1", recordList.get(0).partitionKey()); assertEquals("Kinesis Event 2.", recordList.get(1).data().asString(StandardCharsets.UTF_8)); assertEquals("partition-1", recordList.get(1).partitionKey()); + assertEquals("Kinesis Event 3.", recordList.get(2).data().asString(StandardCharsets.UTF_8)); + assertEquals("partition-1", recordList.get(2).partitionKey()); + assertEquals("Kinesis Event 4.", recordList.get(3).data().asString(StandardCharsets.UTF_8)); + assertEquals("partition-1", recordList.get(3).partitionKey()); } @Override