[ 
https://issues.apache.org/jira/browse/FLINK-32116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-32116:
----------------------------------
    Affects Version/s: aws-connector-4.3.0

> FlinkKinesisConsumer cannot stop-with-savepoint when configured with 
> watermark assigner and watermark tracker
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32116
>                 URL: https://issues.apache.org/jira/browse/FLINK-32116
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.16.1, 1.15.4, aws-connector-4.2.0, aws-connector-4.3.0
>            Reporter: Hong Liang Teoh
>            Assignee: Aleksandr Pilipenko
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> Problem:
> When FlinkKinesisConsumer is configured with legacy watermarking system, it 
> is unable to take a savepoint during stop-with-savepoint, and will get stuck 
> indefinitely.
>  
>  
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
> SimpleStringSchema(), consumerConfig);
> // Set up watermark assigner on Kinesis source
> src.setPeriodicWatermarkAssigner(...);
> // Set up watermark tracker on Kinesis source
> src.setWatermarkTracker(...);{code}
>  
>  
> *Why does it get stuck?*
> When watermarks are setup, the `shardConsumer` and `recordEmitter` thread 
> communicate using asynchronous queue.
> On stop-with-savepoint, shardConsumer waits for queue to empty before 
> continuing. recordEmitter is terminated before queue is empty. As such, queue 
> is never going to be empty, and app gets stuck indefinitely.
>  
> *Workarounds*
> Use the new watermark framework
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
> SimpleStringSchema(), consumerConfig);
> env.addSource(src)
> // Set up watermark strategy with both watermark assigner and watermark 
> tracker
>     
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to