Ryan Pelaez created FLINK-39540:
-----------------------------------

             Summary: Kinesis Connector - EFO Issue when resharding - 
IllegalArgumentException
                 Key: FLINK-39540
                 URL: https://issues.apache.org/jira/browse/FLINK-39540
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: aws-connector-6.0.0, aws-connector-5.0.0
            Reporter: Ryan Pelaez


Resharding a Kinesis stream when a Flink application is running leads to a 
repeated ``IllegalArgumentException`` being thrown. This is caused by the 
subscription attempting to be restarted after it has been completed

 

Logs of error:

```
 
{{[aws-java-sdk-NettyEventLoop-31-1] INFO 
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
 - Subscription complete - shardId-000000002053 
(arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121)
[aws-java-sdk-NettyEventLoop-31-1] INFO 
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
 - Activating subscription to shard shardId-000000002053 with starting position 
StartingPosition\{shardIteratorType=AFTER_SEQUENCE_NUMBER, startingMarker=null} 
for consumer 
arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121.
[aws-java-sdk-NettyEventLoop-31-1] ERROR 
software.amazon.awssdk.utils.async.FlatteningSubscriber - Unexpected exception 
encountered that violates the reactive streams specification. Attempting to 
terminate gracefully.
java.lang.IllegalArgumentException: Invalid StartingPosition. When 
ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, 
startingMarker must be a String.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition(StartingPositionUtil.java:65)
        at 
org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy.subscribeToShard(KinesisAsyncStreamProxy.java:56)
        at 
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription.activateSubscription(FanOutKinesisShardSubscription.java:137)
        at 
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription$FanOutShardSubscriber.onComplete(FanOutKinesisShardSubscription.java:334)
        at 
software.amazon.awssdk.utils.async.DelegatingSubscriber.onComplete(DelegatingSubscriber.java:47)
...}}

{{```}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to