This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 4afac70fa11 add the capability of consume only a single shard or more (#10753) 4afac70fa11 is described below commit 4afac70fa1108042dc1a21790aee95a48cdab1be Author: Hamed Hatami <javaee.hat...@gmail.com> AuthorDate: Fri Jul 21 10:49:09 2023 +0200 add the capability of consume only a single shard or more (#10753) * add the capability of consume only a single shard or more * add the capability of consume only a single shard or more --------- Co-authored-by: Hamed Hatami <hamed.hat...@postnord.com> --- .../component/aws2/kinesis/Kinesis2Consumer.java | 178 +++++++++++++-------- .../KinesisConsumerClosedShardWithSilentTest.java | 16 +- 2 files changed, 124 insertions(+), 70 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 696b56e165a..6a9638641a2 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -37,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; @@ -63,80 +65,118 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R var processedExchangeCount = new AtomicInteger(0); - getShardList() - .parallelStream() - .forEach(shard -> { - - String shardIterator = null; - try { - shardIterator = getShardIterator(shard); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - - if (shardIterator == null) { - // probably closed. Returning 0 as nothing was processed - processedExchangeCount.set(0); - } - - GetRecordsRequest req = GetRecordsRequest - .builder() - .shardIterator(shardIterator) - .limit(getEndpoint() + if (!getEndpoint().getConfiguration().getShardId().isEmpty()) { + var request = DescribeStreamRequest + .builder() + .streamName(getEndpoint().getConfiguration().getStreamName()) + .build(); + DescribeStreamResponse response = null; + if (getEndpoint().getConfiguration().isAsyncClient()) { + try { + response = getAsyncClient() + .describeStream(request) + .get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + response = getClient().describeStream(request); + } + + var shard = response + .streamDescription() + .shards() + .stream() + .filter(shardItem -> shardItem + .shardId() + .equalsIgnoreCase(getEndpoint() .getConfiguration() - .getMaxResultsPerRequest()) - .build(); - - GetRecordsResponse result = null; - if (getEndpoint().getConfiguration().isAsyncClient()) { - try { - result = getAsyncClient() - .getRecords(req) - .get(); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } else { - result = getClient().getRecords(req); - } - - try { - Queue<Exchange> exchanges = createExchanges(result.records()); - processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges))); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // May cache the last successful sequence number, and pass it to the - // getRecords request. That way, on the next poll, we start from where - // we left off, however, I don't know what happens to subsequent - // exchanges when an earlier exchange fails. - - var currentShardIterator = result.nextShardIterator(); - if (isShardClosed) { - switch (getEndpoint().getConfiguration().getShardClosed()) { - case ignore: - LOG.warn("The shard {} is in closed state", currentShardIterator); - break; - case silent: - break; - case fail: - LOG.info("Shard Iterator reaches CLOSE status:{} {}", - getEndpoint().getConfiguration().getStreamName(), - getEndpoint().getConfiguration().getShardId()); - throw new IllegalStateException( - new ReachedClosedStatusException( - getEndpoint().getConfiguration().getStreamName(), shard.shardId())); - default: - throw new IllegalArgumentException("Unsupported shard closed strategy"); - } - } - - }); + .getShardId())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("The shard can't be found")); + + fetchAndPrepareRecordsForCamel(shard, processedExchangeCount); + + } else { + getShardList() + .parallelStream() + .forEach(shard -> { + fetchAndPrepareRecordsForCamel(shard, processedExchangeCount); + }); + } return processedExchangeCount.get(); } + private void fetchAndPrepareRecordsForCamel( + final Shard shard, + AtomicInteger processedExchangeCount) { + String shardIterator = null; + try { + shardIterator = getShardIterator(shard); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + if (shardIterator == null) { + // probably closed. Returning 0 as nothing was processed + processedExchangeCount.set(0); + } + + GetRecordsRequest req = GetRecordsRequest + .builder() + .shardIterator(shardIterator) + .limit(getEndpoint() + .getConfiguration() + .getMaxResultsPerRequest()) + .build(); + + GetRecordsResponse result = null; + if (getEndpoint().getConfiguration().isAsyncClient()) { + try { + result = getAsyncClient() + .getRecords(req) + .get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + result = getClient().getRecords(req); + } + + try { + Queue<Exchange> exchanges = createExchanges(result.records()); + processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges))); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // May cache the last successful sequence number, and pass it to the + // getRecords request. That way, on the next poll, we start from where + // we left off, however, I don't know what happens to subsequent + // exchanges when an earlier exchange fails. + + var currentShardIterator = result.nextShardIterator(); + if (isShardClosed) { + switch (getEndpoint().getConfiguration().getShardClosed()) { + case ignore: + LOG.warn("The shard {} is in closed state", currentShardIterator); + break; + case silent: + break; + case fail: + LOG.info("Shard Iterator reaches CLOSE status:{} {}", + getEndpoint().getConfiguration().getStreamName(), + getEndpoint().getConfiguration().getShardId()); + throw new IllegalStateException( + new ReachedClosedStatusException( + getEndpoint().getConfiguration().getStreamName(), shard.shardId())); + default: + throw new IllegalArgumentException("Unsupported shard closed strategy"); + } + } + } + @Override public int processBatch(Queue<Object> exchanges) throws Exception { int processedExchanges = 0; diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index b7899b95efd..2b34ca315ed 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -28,9 +28,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; @@ -41,6 +44,7 @@ import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,8 +54,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class KinesisConsumerClosedShardWithSilentTest { - @Mock private KinesisClient kinesisClient; @Mock @@ -111,6 +115,16 @@ public class KinesisConsumerClosedShardWithSilentTest { @Test public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception { + + SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build(); + Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build(); + ArrayList<Shard> shardList = new ArrayList<>(); + shardList.add(shard); + + when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) + .thenReturn(DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder().shards(shardList).build()).build()); + underTest.getEndpoint().getConfiguration().setShardId("shardId"); underTest.poll();