[ 
https://issues.apache.org/jira/browse/FLINK-39540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Pelaez updated FLINK-39540:
--------------------------------
    Summary: Kinesis Connector -Address end of subscription behavior bug for 
Kinesis EFO  (was: Kinesis Connector - Removed resubscription for EFO 
subscriptions when they are completed)

> Kinesis Connector -Address end of subscription behavior bug for Kinesis EFO
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-39540
>                 URL: https://issues.apache.org/jira/browse/FLINK-39540
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-5.0.0, aws-connector-6.0.0
>            Reporter: Ryan Pelaez
>            Priority: Major
>              Labels: pull-request-available
>
> *Overview*
> Resharding a Kinesis stream when a Flink application is running leads to a 
> repeated *IllegalArgumentException* being thrown. This is caused by the 
> subscription attempting to be restarted after it has been completed
>  
> *Logs of error*
>  
> {code:java}
> [aws-java-sdk-NettyEventLoop-31-1] INFO 
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  - Subscription complete - shardId-000000002053 
> (arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121)
> [aws-java-sdk-NettyEventLoop-31-1] INFO 
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  - Activating subscription to shard shardId-000000002053 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=null} for consumer 
> arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121.
> [aws-java-sdk-NettyEventLoop-31-1] ERROR 
> software.amazon.awssdk.utils.async.FlatteningSubscriber - Unexpected 
> exception encountered that violates the reactive streams specification. 
> Attempting to terminate gracefully.
> java.lang.IllegalArgumentException: Invalid StartingPosition. When 
> ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, 
> startingMarker must be a String.
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition(StartingPositionUtil.java:65)
> at 
> org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy.subscribeToShard(KinesisAsyncStreamProxy.java:56)
> at 
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription.activateSubscription(FanOutKinesisShardSubscription.java:137)
> at 
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription$FanOutShardSubscriber.onComplete(FanOutKinesisShardSubscription.java:334)
> at 
> software.amazon.awssdk.utils.async.DelegatingSubscriber.onComplete(DelegatingSubscriber.java:47)
> ...{code}
> *Steps to reproduce*
>  # Create AWS Kinesis Data Stream
>  # Set up Source connector to pull from kinesis data stream
>  # put records to stream / verify connector is fetching records
>  # Reshard kinesis stream
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to