[ https://issues.apache.org/jira/browse/FLINK-32116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Cranmer updated FLINK-32116: ---------------------------------- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > FlinkKinesisConsumer cannot stop-with-savepoint when configured with > watermark assigner and watermark tracker > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32116 > URL: https://issues.apache.org/jira/browse/FLINK-32116 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.16.1, 1.15.4, aws-connector-4.1.0 > Reporter: Hong Liang Teoh > Assignee: Aleksandr Pilipenko > Priority: Major > Fix For: aws-connector-4.3.0 > > > Problem: > When FlinkKinesisConsumer is configured with legacy watermarking system, it > is unable to take a savepoint during stop-with-savepoint, and will get stuck > indefinitely. > > > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > // Set up watermark assigner on Kinesis source > src.setPeriodicWatermarkAssigner(...); > // Set up watermark tracker on Kinesis source > src.setWatermarkTracker(...);{code} > > > *Why does it get stuck?* > When watermarks are setup, the `shardConsumer` and `recordEmitter` thread > communicate using asynchronous queue. > On stop-with-savepoint, shardConsumer waits for queue to empty before > continuing. recordEmitter is terminated before queue is empty. As such, queue > is never going to be empty, and app gets stuck indefinitely. > > *Workarounds* > Use the new watermark framework > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > env.addSource(src) > // Set up watermark strategy with both watermark assigner and watermark > tracker > > .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)