[ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-29395:
---------------------------------
    Fix Version/s: 1.17.0
                   1.16.1
                       (was: 1.16.0)

> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
>                 Key: FLINK-29395
>                 URL: https://issues.apache.org/jira/browse/FLINK-29395
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
>            Reporter: Hong Liang Teoh
>            Assignee: Hong Liang Teoh
>            Priority: Major
>             Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel 
> starting position and the first record batch is empty. This is because the 
> consumer tries to recalculate the start position from the timestamp sentinel, 
> this operation is not supported.
> This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088
> *Reproduction Steps*
> Setup an application consuming from Kinesis with following properties and 
> consume from an empty shard:
> {code:java}
> String format = "yyyy-MM-dd'T'HH:mm:ss";
> String date = new SimpleDateFormat(format).format(new Date());
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
> date);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
>  format);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "AT_TIMESTAMP"); 
> consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
> "EFO"); {code}
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type: 
> AT_TIMESTAMP_SEQUENCE_NUM
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this 
> condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to