[ https://issues.apache.org/jira/browse/FLINK-18053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121485#comment-17121485 ]
Jiayi Liao commented on FLINK-18053: ------------------------------------ Yes. Flink doesn't save watermark in savepoint. Duplicate of FLINK-5601. > Savepoints do not preserve watermarks > ------------------------------------- > > Key: FLINK-18053 > URL: https://issues.apache.org/jira/browse/FLINK-18053 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Table SQL / Runtime > Affects Versions: 1.10.1 > Reporter: Sergii Mikhtoniuk > Priority: Major > Attachments: 1.csv, 2.csv, MyApp.scala > > > Flink produces invalid result when streaming SQL aggregation is stopped and > resumed from a savepoint. > > *Steps to reproduce:* > 1) Create an assembly from the attached file. > This job will be reading CSV files as a stream. Files contain fake stock > tickers which will be aggregated with following tumbling window query: > {code:java} > SELECT > TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time, > symbol as symbol, > min(price) as `min`, > max(price) as `max` > FROM Tickers > GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol > {code} > Stream uses punctuated watermarks with max lateness of 1 day > 2) Create two CSV files with fake stock tickers: > {{1.csv}}: > {code:java} > 2000-01-01 01:00:00.0,A,10 > 2000-01-01 01:00:00.0,B,20 > 2000-01-01 02:00:00.0,A,10 > 2000-01-01 02:00:00.0,B,21 > 2000-01-02 01:00:00.0,A,12 > 2000-01-02 01:00:00.0,B,22 > 2000-01-02 02:00:00.0,A,13 > 2000-01-02 02:00:00.0,B,23 > 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark > 2000-01-03 01:00:00.0,A,14 > 2000-01-03 01:00:00.0,B,24 > 2000-01-03 02:00:00.0,A,15 > 2000-01-03 02:00:00.0,B,25 > {code} > {{2.csv}}: > {code:java} > 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark > 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, > they only push watermark up > 2000-01-04 01:00:00.0,B,26 > 2000-01-04 02:00:00.0,A,17 > 2000-01-04 02:00:00.0,B,27 > 2000-01-05 01:00:00.0,A,18 > 2000-01-05 01:00:00.0,B,28 > {code} > 3) Run the job on the folder containing both files. Observed result is as > expected: > {code:java} > 2000-01-01,A,10,11 > 2000-01-01,B,20,21 > 2000-01-02,A,12,13 > 2000-01-02,B,22,23 > 2000-01-03,A,14,15 > 2000-01-03,B,24,25 > {code} > 4) Now run the job with only {{1.csv}} in the directory. Produces still > correct: > {code:java} > 2000-01-01,A,10,11 > 2000-01-01,B,20,21 > {code} > 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job > from savepoint. Produces incorrect result: > {code:java} > 2000-01-01,A,12,12 > 2000-01-02,A,12,13 > 2000-01-02,B,22,23 > 2000-01-03,A,14,15 > 2000-01-03,B,24,25 > {code} > > *Expectation:* > We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not > have passed the watermark check. This tells me that Flink did not save the > watermark in the savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)