James created FLINK-28357: ----------------------------- Summary: Watermark issue when recovering Finished sources Key: FLINK-28357 URL: https://issues.apache.org/jira/browse/FLINK-28357 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.15.0 Environment: This can be reproduced in an IDE with the attached sample program. Reporter: James Attachments: WatermarkDemoMain.java, image-2022-07-01-16-18-14-768.png, longExample.txt
Copied mostly from email trail on the flink user mailing list: I done a lot of experimentation and I’m convinced there is a problem with Flink handling Finished sources and recovery. The program consists of: * Two sources: ** One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds. *** Prints the console a message saying the watermark has been emitted. *** *Throws an exception every 5 or 10 iterations to force a recovery.* ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a message to the console and returns. * The “Short Live Source” feeds into a map() and then it joins with the “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by Flink. The problem here is that the “Join” receives no Long.MAX_VALUE watermark from the map() in some situations after a recovery. The dashboard goes from showing this: !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488AAAAApJh59ltR0SkB4Wz2JhnvgAFXJ9puQAAAAESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true! To the below after a recovery (with the currentInput1/2Watermark metrics showing input 2 having not received a watermark from the map, saying –Long.MAX_VALUE): !image-2022-07-01-16-18-14-768.png! The program is currently set to checkpoint every 5 seconds. By experimenting with 70 seconds, it seems that if only one checkpoint has been taken with the “Short Lived Source” in a FINISHED state since the last recovery then everything works fine and the restarted “Short Lived Source” emits its watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE watermark” message on the console meaning the run() definitely executed. However, I found that if 2 or more checkpoints are taken since the last recovery with the source in a FINISHED state then the console message does not appear and the watermark is not emitted. To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or Flink if I see two or more checkpoints logged in between recoveries. If zero or checkpoints are made, everything is fine – the join gets the watermark and I see my console message. You can play with the checkpointing frequency as per the code comments: // Useful checkpoint interval options: // 5 - see the problem after the first recovery // 70 - useful to see bad behaviour kick in after a recovery or two // 120 - won't see the problem as we don't have 2 checkpoints within a single recovery session If I merge the Triggering/Completed checkpoint messages in the log with my console output I see something like this clearly showing the “Short Lived Source” run() method is not executed after 2 checkpoints with the operators marked as FINISHED: 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE watermark. 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial watermark=1656503551268 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop watermark=1656503561302 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop watermark=1656503571302 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop watermark=1656503581303 2022-06-29 11:53:02.772 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 1 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) 2022-06-29 11:53:02.870 INFO [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 1 for job 877656d7752bc1304c2cb92790e6aefb 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop watermark=1656503591303 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop watermark=1656503601304 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------ 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial watermark=1656503602405 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE watermark. 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop watermark=1656503612406 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop watermark=1656503622406 2022-06-29 11:53:51.048 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 2 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) 2022-06-29 11:53:51.067 INFO [jobmanager-io-thread-4] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 2 for job 877656d7752bc1304c2cb92790e6aefb 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop watermark=1656503632407 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop watermark=1656503642407 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop watermark=1656503652408 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop watermark=1656503662408 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop watermark=1656503672409 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop watermark=1656503682409 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop watermark=1656503692410 2022-06-29 11:55:01.048 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 3 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) 2022-06-29 11:55:01.057 INFO [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 3 for job 877656d7752bc1304c2cb92790e6aefb 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop watermark=1656503702410 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------ 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial watermark=1656503703444 <<<<< NO “ShortLivedEmptySource” message after recovery 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop watermark=1656503713445 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop watermark=1656503723446 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop watermark=1656503733446 I have also attached a longer example with shows everything working fine after 5 recoveries, and then breaking after the 6{^}th{^}. I am guessing here it has something to do with the checkpointing and recovery of a FINISHED source. Finally, here are some ways that allows the code to work: * Change the code so the “Short Lived Source” doesn’t return from run() and stays RUNNING (uncomment the Thread.sleep) * As I mentioned before, if I remove the map() operator the problem in the join also goes away. (I don’t see the console output but the join is happy) * Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have two checkpoints with FINISHED state per recovery. The fact these changes prevent the issue means I really think there’s some bug or inconsistency here – if somebody could explain I would really appreciate it. -- This message was sent by Atlassian Jira (v8.20.10#820010)