[
https://issues.apache.org/jira/browse/FLINK-39540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ryan Pelaez updated FLINK-39540:
--------------------------------
Summary: Kinesis Connector -Address end of subscription behavior bug for
Kinesis EFO (was: Kinesis Connector - Removed resubscription for EFO
subscriptions when they are completed)
> Kinesis Connector -Address end of subscription behavior bug for Kinesis EFO
> ---------------------------------------------------------------------------
>
> Key: FLINK-39540
> URL: https://issues.apache.org/jira/browse/FLINK-39540
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: aws-connector-5.0.0, aws-connector-6.0.0
> Reporter: Ryan Pelaez
> Priority: Major
> Labels: pull-request-available
>
> *Overview*
> Resharding a Kinesis stream when a Flink application is running leads to a
> repeated *IllegalArgumentException* being thrown. This is caused by the
> subscription attempting to be restarted after it has been completed
>
> *Logs of error*
>
> {code:java}
> [aws-java-sdk-NettyEventLoop-31-1] INFO
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
> - Subscription complete - shardId-000000002053
> (arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121)
> [aws-java-sdk-NettyEventLoop-31-1] INFO
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
> - Activating subscription to shard shardId-000000002053 with starting
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER,
> startingMarker=null} for consumer
> arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121.
> [aws-java-sdk-NettyEventLoop-31-1] ERROR
> software.amazon.awssdk.utils.async.FlatteningSubscriber - Unexpected
> exception encountered that violates the reactive streams specification.
> Attempting to terminate gracefully.
> java.lang.IllegalArgumentException: Invalid StartingPosition. When
> ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER,
> startingMarker must be a String.
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at
> org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition(StartingPositionUtil.java:65)
> at
> org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy.subscribeToShard(KinesisAsyncStreamProxy.java:56)
> at
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription.activateSubscription(FanOutKinesisShardSubscription.java:137)
> at
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription$FanOutShardSubscriber.onComplete(FanOutKinesisShardSubscription.java:334)
> at
> software.amazon.awssdk.utils.async.DelegatingSubscriber.onComplete(DelegatingSubscriber.java:47)
> ...{code}
> *Steps to reproduce*
> # Create AWS Kinesis Data Stream
> # Set up Source connector to pull from kinesis data stream
> # put records to stream / verify connector is fetching records
> # Reshard kinesis stream
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)