This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new ff2caf3c1a7 integrate multi-shard consumer with async client (#10733) ff2caf3c1a7 is described below commit ff2caf3c1a79e2cd978fa6c864311ba16d282a98 Author: Hamed Hatami <javaee.hat...@gmail.com> AuthorDate: Wed Jul 19 22:41:03 2023 +0200 integrate multi-shard consumer with async client (#10733) * integrate multi-shard consumer with async client * integrate multi-shard consumer with async client * integrate multi-shard consumer with async client * integrate multi-shard consumer with async client --------- Co-authored-by: Hamed Hatami <hamed.hat...@postnord.com> --- components/camel-aws/camel-aws2-kinesis/pom.xml | 6 +- .../component/aws2/kinesis/Kinesis2Consumer.java | 87 ++++++++++++++-------- .../KinesisConsumerClosedShardWithSilentTest.java | 6 +- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml index 07811f6045a..b85766e980a 100644 --- a/components/camel-aws/camel-aws2-kinesis/pom.xml +++ b/components/camel-aws/camel-aws2-kinesis/pom.xml @@ -43,9 +43,9 @@ <version>${aws-java-sdk2-version}</version> </dependency> <dependency> - <groupId>software.amazon.kinesis</groupId> - <artifactId>amazon-kinesis-client</artifactId> - <version>${amazon-kinesis-common-version}</version> + <groupId>software.amazon.awssdk</groupId> + <artifactId>netty-nio-client</artifactId> + <version>${aws-java-sdk2-version}</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index b00bdfb4e89..323b9c961d4 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -16,12 +16,6 @@ */ package org.apache.camel.component.aws2.kinesis; -import java.util.ArrayDeque; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -35,6 +29,14 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -46,10 +48,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> { - private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class); - - private String currentShardIterator; private boolean isShardClosed; private ResumeStrategy resumeStrategy; @@ -88,7 +87,20 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .getConfiguration() .getMaxResultsPerRequest()) .build(); - GetRecordsResponse result = getClient().getRecords(req); + + GetRecordsResponse result = null; + if (getEndpoint().getConfiguration().isAsyncClient()) { + try { + result = getAsyncClient() + .getRecords(req) + .get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + result = getClient().getRecords(req); + } + try { Queue<Exchange> exchanges = createExchanges(result.records()); @@ -102,7 +114,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R // we left off, however, I don't know what happens to subsequent // exchanges when an earlier exchange fails. - currentShardIterator = result.nextShardIterator(); + var currentShardIterator = result.nextShardIterator(); if (isShardClosed) { switch (getEndpoint().getConfiguration().getShardClosed()) { case ignore: @@ -145,36 +157,33 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R return getEndpoint().getClient(); } + private KinesisAsyncClient getAsyncClient() { + return getEndpoint().getAsyncClient(); + } + @Override public Kinesis2Endpoint getEndpoint() { return (Kinesis2Endpoint) super.getEndpoint(); } private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException { - // either return a cached one or get a new one via a GetShardIterator - // request. - if (currentShardIterator == null) { - var shardId = shard.shardId(); - isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null; + var shardId = shard.shardId(); + isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null; + LOG.debug("ShardId is: {}", shardId); - LOG.debug("ShardId is: {}", shardId); + GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder() + .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) + .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); - GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder() - .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) - .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); - - if (hasSequenceNumber()) { - request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); - } - - resume(request); - GetShardIteratorResponse result = getClient().getShardIterator(request.build()); - currentShardIterator = result.shardIterator(); + if (hasSequenceNumber()) { + request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); } - LOG.debug("Shard Iterator is: {}", currentShardIterator); - return currentShardIterator; + resume(request); + GetShardIteratorResponse result = getClient().getShardIterator(request.build()); + + return result.shardIterator(); } private void resume(GetShardIteratorRequest.Builder req) { @@ -228,7 +237,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private boolean hasSequenceNumber() { return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty() && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); + || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); } @Override @@ -261,7 +270,21 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .streamName(getEndpoint().getConfiguration().getStreamName()) .build(); - return getClient().listShards(request).shards(); + List<Shard> shardList = null; + if (getEndpoint().getConfiguration().isAsyncClient()) { + try { + shardList = getAsyncClient() + .listShards(request) + .get() + .shards(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + shardList = getClient().listShards(request).shards(); + } + + return shardList; } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index 76a5599e8f1..b7899b95efd 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -79,7 +79,7 @@ public class KinesisConsumerClosedShardWithSilentTest { shardList.add(shard); when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder() - .nextShardIterator("nextShardIterator") + .nextShardIterator("shardIterator") .records( Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset())) .build(), @@ -160,9 +160,9 @@ public class KinesisConsumerClosedShardWithSilentTest { final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); - verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); + verify(kinesisClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class)); verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator")); - assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("nextShardIterator")); + assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("shardIterator")); } }