Ryan Pelaez created FLINK-39540:
-----------------------------------
Summary: Kinesis Connector - EFO Issue when resharding -
IllegalArgumentException
Key: FLINK-39540
URL: https://issues.apache.org/jira/browse/FLINK-39540
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: aws-connector-6.0.0, aws-connector-5.0.0
Reporter: Ryan Pelaez
Resharding a Kinesis stream when a Flink application is running leads to a
repeated ``IllegalArgumentException`` being thrown. This is caused by the
subscription attempting to be restarted after it has been completed
Logs of error:
```
{{[aws-java-sdk-NettyEventLoop-31-1] INFO
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
- Subscription complete - shardId-000000002053
(arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121)
[aws-java-sdk-NettyEventLoop-31-1] INFO
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
- Activating subscription to shard shardId-000000002053 with starting position
StartingPosition\{shardIteratorType=AFTER_SEQUENCE_NUMBER, startingMarker=null}
for consumer
arn:aws:kinesis:us-east-1:123456:stream/test-source-stream/consumer/my-flink-efo-consumer:1774401121.
[aws-java-sdk-NettyEventLoop-31-1] ERROR
software.amazon.awssdk.utils.async.FlatteningSubscriber - Unexpected exception
encountered that violates the reactive streams specification. Attempting to
terminate gracefully.
java.lang.IllegalArgumentException: Invalid StartingPosition. When
ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER,
startingMarker must be a String.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at
org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition(StartingPositionUtil.java:65)
at
org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy.subscribeToShard(KinesisAsyncStreamProxy.java:56)
at
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription.activateSubscription(FanOutKinesisShardSubscription.java:137)
at
org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription$FanOutShardSubscriber.onComplete(FanOutKinesisShardSubscription.java:334)
at
software.amazon.awssdk.utils.async.DelegatingSubscriber.onComplete(DelegatingSubscriber.java:47)
...}}
{{```}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)