[ https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo updated FLINK-29395: --------------------------------- Fix Version/s: 1.17.0 1.16.1 (was: 1.16.0) > [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard > --------------------------------------------------------------------- > > Key: FLINK-29395 > URL: https://issues.apache.org/jira/browse/FLINK-29395 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2 > Reporter: Hong Liang Teoh > Assignee: Hong Liang Teoh > Priority: Major > Fix For: 1.17.0, 1.15.3, 1.16.1 > > > *Background* > The consumer fails when an EFO record publisher uses a timestamp sentinel > starting position and the first record batch is empty. This is because the > consumer tries to recalculate the start position from the timestamp sentinel, > this operation is not supported. > This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088 > *Reproduction Steps* > Setup an application consuming from Kinesis with following properties and > consume from an empty shard: > {code:java} > String format = "yyyy-MM-dd'T'HH:mm:ss"; > String date = new SimpleDateFormat(format).format(new Date()); > consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, > date); > consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, > format); > consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "AT_TIMESTAMP"); > consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, > "EFO"); {code} > *Error* > {code:java} > java.lang.IllegalArgumentException: Unexpected sentinel type: > AT_TIMESTAMP_SEQUENCE_NUM > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115) > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91) > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) {code} > > *Solution* > This is fixed by reusing the existing timestamp starting position in this > condition. -- This message was sent by Atlassian Jira (v8.20.10#820010)