This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b52830b560f CAMEL-20794: AWS2 Kinesis producer supports sending batch 
(#14618)
b52830b560f is described below

commit b52830b560fa64e7c9188f4105837f87d6aaa3f4
Author: Fan Yang <fan...@hotmail.com>
AuthorDate: Mon Jun 24 17:17:56 2024 +0800

    CAMEL-20794: AWS2 Kinesis producer supports sending batch (#14618)
    
    * Kinesis producer supports sending batch
    
    * Format code
    
    * Set max batch size 500
---
 .../component/aws2/kinesis/Kinesis2Producer.java   | 96 +++++++++++++++++++---
 .../kinesis/integration/KinesisProducerIT.java     | 13 ++-
 2 files changed, 97 insertions(+), 12 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index ce8903aa4ac..c51fe777379 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
@@ -23,9 +28,15 @@ import org.apache.camel.util.ObjectHelper;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
 
 public class Kinesis2Producer extends DefaultProducer {
 
+    // Maximum number of records that can be sent in a single PutRecords 
request
+    private static final int MAX_BATCH_SIZE = 500;
+
     private KinesisConnection connection;
 
     public Kinesis2Producer(Kinesis2Endpoint endpoint) {
@@ -45,11 +56,80 @@ public class Kinesis2Producer extends DefaultProducer {
         return (Kinesis2Endpoint) super.getEndpoint();
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        ObjectHelper.notNull(connection, "connection", this);
+    }
+
     @Override
     public void process(Exchange exchange) throws Exception {
+        Object body = exchange.getIn().getBody();
+        if (body instanceof Iterable) {
+            sendBatchRecords(exchange);
+        } else {
+            sendSingleRecord(exchange);
+        }
+    }
+
+    private void sendBatchRecords(Exchange exchange) {
+        Object partitionKey = 
exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY);
+        ensurePartitionKeyNotNull(partitionKey);
+        List<List<PutRecordsRequestEntry>> requestBatchList = 
createRequestBatchList(exchange, partitionKey);
+        for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) {
+            PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
+                    
.streamName(getEndpoint().getConfiguration().getStreamName())
+                    .records(requestBatch)
+                    .build();
+            PutRecordsResponse putRecordsResponse = 
connection.getClient(getEndpoint()).putRecords(putRecordsRequest);
+            if (putRecordsResponse.failedRecordCount() > 0) {
+                throw new RuntimeException(
+                        "Failed to send records " + 
putRecordsResponse.failedRecordCount() + " of "
+                                           + 
putRecordsResponse.records().size());
+            }
+        }
+    }
+
+    private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange 
exchange, Object partitionKey) {
+        List<List<PutRecordsRequestEntry>> requestBatchList = new 
ArrayList<>();
+        List<PutRecordsRequestEntry> requestBatch = new 
ArrayList<>(MAX_BATCH_SIZE);
+        for (Object record : exchange.getIn().getBody(Iterable.class)) {
+            SdkBytes sdkBytes;
+            if (record instanceof byte[] bytes) {
+                sdkBytes = SdkBytes.fromByteArray(bytes);
+            } else if (record instanceof ByteBuffer bf) {
+                sdkBytes = SdkBytes.fromByteBuffer(bf);
+            } else if (record instanceof InputStream is) {
+                sdkBytes = SdkBytes.fromInputStream(is);
+            } else if (record instanceof String str) {
+                sdkBytes = SdkBytes.fromUtf8String(str);
+            } else {
+                throw new IllegalArgumentException(
+                        "Record type not supported. Must be byte[], 
ByteBuffer, InputStream or UTF-8 String");
+            }
+
+            PutRecordsRequestEntry putRecordsRequestEntry = 
PutRecordsRequestEntry.builder()
+                    .data(sdkBytes)
+                    .partitionKey(partitionKey.toString())
+                    .build();
+            requestBatch.add(putRecordsRequestEntry);
+            if (requestBatch.size() == MAX_BATCH_SIZE) {
+                requestBatchList.add(requestBatch);
+                requestBatch = new ArrayList<>(MAX_BATCH_SIZE);
+            }
+        }
+        if (!requestBatch.isEmpty()) {
+            requestBatchList.add(requestBatch);
+        }
+
+        return requestBatchList;
+    }
+
+    private void sendSingleRecord(Exchange exchange) {
         PutRecordRequest request = createRequest(exchange);
         PutRecordResponse putRecordResult = 
connection.getClient(getEndpoint()).putRecord(request);
-        Message message = getMessageForResponse(exchange);
+        Message message = exchange.getMessage();
         message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER, 
putRecordResult.sequenceNumber());
         message.setHeader(Kinesis2Constants.SHARD_ID, 
putRecordResult.shardId());
     }
@@ -62,6 +142,7 @@ public class Kinesis2Producer extends DefaultProducer {
         PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder();
         putRecordRequest.data(SdkBytes.fromByteArray(body));
         
putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName());
+        ensurePartitionKeyNotNull(partitionKey);
         putRecordRequest.partitionKey(partitionKey.toString());
         if (sequenceNumber != null) {
             
putRecordRequest.sequenceNumberForOrdering(sequenceNumber.toString());
@@ -69,14 +150,9 @@ public class Kinesis2Producer extends DefaultProducer {
         return putRecordRequest.build();
     }
 
-    public static Message getMessageForResponse(final Exchange exchange) {
-        return exchange.getMessage();
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        ObjectHelper.notNull(connection, "connection", this);
+    private void ensurePartitionKeyNotNull(Object partitionKey) {
+        if (partitionKey == null) {
+            throw new IllegalArgumentException("Partition key must be 
specified");
+        }
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 4b43ff03f40..8b303017571 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.kinesis.integration;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -105,14 +106,22 @@ public class KinesisProducerIT extends CamelTestSupport {
             exchange.getIn().setBody("Kinesis Event 2.");
         });
 
-        List<Record> records;
+        template.send("direct:start", ExchangePattern.InOut, exchange -> {
+            exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, 
"partition-1");
+            exchange.getIn().setBody(Arrays.asList("Kinesis Event 3.", 
"Kinesis Event 4.".getBytes(StandardCharsets.UTF_8)));
+        });
+
         Awaitility.await().atMost(5, TimeUnit.SECONDS)
-                .untilAsserted(() -> assertEquals(2, consumeMessages()));
+                .untilAsserted(() -> assertEquals(4, consumeMessages()));
 
         assertEquals("Kinesis Event 1.", 
recordList.get(0).data().asString(StandardCharsets.UTF_8));
         assertEquals("partition-1", recordList.get(0).partitionKey());
         assertEquals("Kinesis Event 2.", 
recordList.get(1).data().asString(StandardCharsets.UTF_8));
         assertEquals("partition-1", recordList.get(1).partitionKey());
+        assertEquals("Kinesis Event 3.", 
recordList.get(2).data().asString(StandardCharsets.UTF_8));
+        assertEquals("partition-1", recordList.get(2).partitionKey());
+        assertEquals("Kinesis Event 4.", 
recordList.get(3).data().asString(StandardCharsets.UTF_8));
+        assertEquals("partition-1", recordList.get(3).partitionKey());
     }
 
     @Override

Reply via email to