[ https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski reassigned FLINK-28357: -------------------------------------- Assignee: Piotr Nowojski > Watermark issue when recovering Finished sources > ------------------------------------------------ > > Key: FLINK-28357 > URL: https://issues.apache.org/jira/browse/FLINK-28357 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.15.0 > Environment: This can be reproduced in an IDE with the attached > sample program. > Reporter: James > Assignee: Piotr Nowojski > Priority: Major > Attachments: WatermarkDemoMain.java, > image-2022-07-01-16-18-14-768.png, longExample.txt > > > Copied mostly from email trail on the flink user mailing list: > I done a lot of experimentation and I’m convinced there is a problem with > Flink handling Finished sources and recovery. > The program consists of: > * Two sources: > ** One “Long Running Source” – stays alive and emits a watermark of > DateTime.now() every 10 seconds. > *** Prints the console a message saying the watermark has been emitted. > *** *Throws an exception every 5 or 10 iterations to force a recovery.* > ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a > message to the console and returns. > * The “Short Live Source” feeds into a map() and then it joins with the > “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” > state by Flink. > The problem here is that the “Join” receives no Long.MAX_VALUE watermark from > the map() in some situations after a recovery. The dashboard goes from > showing this: > !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488AAAAApJh59ltR0SkB4Wz2JhnvgAFXJ9puQAAAAESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true! > To the below after a recovery (with the currentInput1/2Watermark metrics > showing input 2 having not received a watermark from the map, saying > –Long.MAX_VALUE): > !image-2022-07-01-16-18-14-768.png! > The program is currently set to checkpoint every 5 seconds. By experimenting > with 70 seconds, it seems that if only one checkpoint has been taken with the > “Short Lived Source” in a FINISHED state since the last recovery then > everything works fine and the restarted “Short Lived Source” emits its > watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE > watermark” message on the console meaning the run() definitely executed. > However, I found that if 2 or more checkpoints are taken since the last > recovery with the source in a FINISHED state then the console message does > not appear and the watermark is not emitted. > To repeat – the Join does not get a Long.MAX_VALUE watermark from my source > or Flink if I see two or more checkpoints logged in between recoveries. If > zero or checkpoints are made, everything is fine – the join gets the > watermark and I see my console message. You can play with the checkpointing > frequency as per the code comments: > // Useful checkpoint interval options: > // 5 - see the problem after the first recovery > // 70 - useful to see bad behaviour kick in after a recovery or two > // 120 - won't see the problem as we don't have 2 checkpoints within > a single recovery session > If I merge the Triggering/Completed checkpoint messages in the log with my > console output I see something like this clearly showing the “Short Lived > Source” run() method is not executed after 2 checkpoints with the operators > marked as FINISHED: > > 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE > watermark. > 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial > watermark=1656503551268 > 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop > watermark=1656503561302 > 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop > watermark=1656503571302 > 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop > watermark=1656503581303 > 2022-06-29 11:53:02.772 INFO [Checkpoint Timer] > o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 1 > (type=CheckpointType\{name='Checkpoint', > sharingFilesStrategy=FORWARD_BACKWARD}) > 2022-06-29 11:53:02.870 INFO [jobmanager-io-thread-10] > o.a.f.r.c.CheckpointCoordinator Completed checkpoint 1 for job > 877656d7752bc1304c2cb92790e6aefb > 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop > watermark=1656503591303 > 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop > watermark=1656503601304 > 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------ > 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial > watermark=1656503602405 > 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE > watermark. > 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop > watermark=1656503612406 > 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop > watermark=1656503622406 > 2022-06-29 11:53:51.048 INFO [Checkpoint Timer] > o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 2 > (type=CheckpointType\{name='Checkpoint', > sharingFilesStrategy=FORWARD_BACKWARD}) > 2022-06-29 11:53:51.067 INFO [jobmanager-io-thread-4] > o.a.f.r.c.CheckpointCoordinator Completed checkpoint 2 for job > 877656d7752bc1304c2cb92790e6aefb > 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop > watermark=1656503632407 > 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop > watermark=1656503642407 > 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop > watermark=1656503652408 > 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop > watermark=1656503662408 > 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop > watermark=1656503672409 > 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop > watermark=1656503682409 > 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop > watermark=1656503692410 > 2022-06-29 11:55:01.048 INFO [Checkpoint Timer] > o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 3 > (type=CheckpointType\{name='Checkpoint', > sharingFilesStrategy=FORWARD_BACKWARD}) > 2022-06-29 11:55:01.057 INFO [jobmanager-io-thread-10] > o.a.f.r.c.CheckpointCoordinator Completed checkpoint 3 for job > 877656d7752bc1304c2cb92790e6aefb > 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop > watermark=1656503702410 > 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------ > 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial > watermark=1656503703444 <<<<< NO “ShortLivedEmptySource” message after > recovery > 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop > watermark=1656503713445 > 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop > watermark=1656503723446 > 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop > watermark=1656503733446 > > I have also attached a longer example with shows everything working fine > after 5 recoveries, and then breaking after the 6{^}th{^}. > I am guessing here it has something to do with the checkpointing and recovery > of a FINISHED source. > Finally, here are some ways that allows the code to work: > * Change the code so the “Short Lived Source” doesn’t return from run() and > stays RUNNING (uncomment the Thread.sleep) > * As I mentioned before, if I remove the map() operator the problem in the > join also goes away. (I don’t see the console output but the join is happy) > * Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have > two checkpoints with FINISHED state per recovery. > The fact these changes prevent the issue means I really think there’s some > bug or inconsistency here – if somebody could explain I would really > appreciate it. > -- This message was sent by Atlassian Jira (v8.20.10#820010)