[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-28357:
-----------------------------------
    Fix Version/s: 1.16.0

> Watermark issue when recovering Finished sources
> ------------------------------------------------
>
>                 Key: FLINK-28357
>                 URL: https://issues.apache.org/jira/browse/FLINK-28357
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>         Environment: This can be reproduced in an IDE with the attached 
> sample program.
>            Reporter: James
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.15.2, 1.14.6
>
>         Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488AAAAApJh59ltR0SkB4Wz2JhnvgAFXJ9puQAAAAESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source 
> or Flink if I see two or more checkpoints logged in between recoveries. If 
> zero or checkpoints are made, everything is fine – the join gets the 
> watermark and I see my console message. You can play with the checkpointing 
> frequency as per the code comments:
>         // Useful checkpoint interval options:
>         //    5 - see the problem after the first recovery
>         //   70 - useful to see bad behaviour kick in after a recovery or two
>         //  120 - won't see the problem as we don't have 2 checkpoints within 
> a single recovery session
> If I merge the Triggering/Completed checkpoint messages in the log with my 
> console output I see something like this clearly showing the “Short Lived 
> Source” run() method is not executed after 2 checkpoints with the operators 
> marked as FINISHED:
>  
> 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE 
> watermark.
> 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial 
> watermark=1656503551268
> 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop 
> watermark=1656503561302
> 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop 
> watermark=1656503571302
> 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop 
> watermark=1656503581303
> 2022-06-29 11:53:02.772 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10] 
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop 
> watermark=1656503591303
> 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop 
> watermark=1656503601304
> 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------
> 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial 
> watermark=1656503602405
> 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE 
> watermark.
> 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop 
> watermark=1656503612406
> 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop 
> watermark=1656503622406
> 2022-06-29 11:53:51.048 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4] 
> o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop 
> watermark=1656503632407
> 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop 
> watermark=1656503642407
> 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop 
> watermark=1656503652408
> 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop 
> watermark=1656503662408
> 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop 
> watermark=1656503672409
> 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop 
> watermark=1656503682409
> 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop 
> watermark=1656503692410
> 2022-06-29 11:55:01.048 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10] 
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop 
> watermark=1656503702410
> 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------
> 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial 
> watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message after 
> recovery
> 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop 
> watermark=1656503713445
> 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop 
> watermark=1656503723446
> 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop 
> watermark=1656503733446
>  
> I have also attached a longer example with shows everything working fine 
> after 5 recoveries, and then breaking after the 6{^}th{^}.
> I am guessing here it has something to do with the checkpointing and recovery 
> of a FINISHED source.
> Finally, here are some ways that allows the code to work:
>  * Change the code so the “Short Lived Source” doesn’t return from run() and 
> stays RUNNING (uncomment the Thread.sleep)
>  * As I mentioned before, if I remove the map() operator the problem in the 
> join also goes away. (I don’t see the console output but the join is happy)
>  * Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have 
> two checkpoints with FINISHED state per recovery.
> The fact these changes prevent the issue means I really think there’s some 
> bug or inconsistency here – if somebody could explain I would really 
> appreciate it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to