Scratch that - your WatermarkStrategy DOES work (when I implement it correctly!). Well, almost: As you can see below (code pushed to repo), the Timer events are still appearing somewhat late in the stream - 4 events late in this case. It may be just good-enough for my purposes, though it will make building test cases painful, so if you have any ideas how I could fix that, would be much appreciated.
{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 5000 Cancelled previous timer. "} {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 5000 Cancelled previous timer. "} {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"} {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"} {"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"} {"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"} -Pilgrim -- Learn more at https://devicepilot.com @devicepilot <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33> +44 7961 125282 See our latest features <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33> and book me <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33> for a video call. On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart <pilgrim.be...@devicepilot.com> wrote: > Chesnay, > I cannot reproduce this - I've tried the approaches you suggest, but > nothing I've done makes the timers fire at the correct time in the stream - > they only fire when the stream has ended. If you have an EventTime example > where they fire at the right time in the stream, I'd love to see it. Or any > ideas for other things to try? Could it perhaps be related to using a file > source? > > Thanks, > > -Pilgrim > -- > Learn more at https://devicepilot.com @devicepilot > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87> > +44 7961 125282 > See our latest features > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87> > and book me > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87> > for > a video call. > > > > On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler <ches...@apache.org> wrote: > >> Actually, if the parallelism is 1 then it works as it should. sigh.... >> >> On 1/27/2021 6:52 PM, Chesnay Schepler wrote: >> >> Note that while this does fix the issue of timers not firing while the >> job is running, it seems to be firing too many timers... >> >> On 1/27/2021 6:49 PM, Chesnay Schepler wrote: >> >> My bad, I was still using the custom WatermarkStrategy that emits a >> watermark for each event. >> >> .assignTimestampsAndWatermarks( >> new WatermarkStrategy<T>() { >> @Override >> public WatermarkGenerator<T> >> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { >> return new >> BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1)) { >> @Override >> public void onEvent(Tevent, long eventTimestamp, >> WatermarkOutput output) { >> super.onEvent(event, eventTimestamp, output); >> super.onPeriodicEmit(output); >> } >> }; >> } >> }.withTimestampAssigner(...) >> >> >> @Aljoscha This is about Flink 1.11. Since the periodic watermarks are >> dependent on processing time, am I assuming correctly if the job finishes >> quickly that watermarks may never be emitted (except for those at the job)? >> Is there any way to emit periodic watermarks based on event time? >> Is there any way to enable punctuated watermarks for the existing >> watermark strategies without having to implement a custom one? >> >> On 1/27/2021 5:57 PM, Pilgrim Beart wrote: >> >> Chesnay, >> Thanks for this - I've made the change you suggested >> (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers >> still get processed only on stream end. >> I have pushed a new version, with this change, and also emitting some >> information in a .log field. >> If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll >> see the relevant changes. >> >> In DPTimeoutFunction you'll see that if I add code to say "cancel the >> timer only if it wouldn't have gone off" then the output is now correct - >> individual devices do timeout. However, this output only appears at the end >> of the stream (i.e. time jumps backwards as all the timers are processed) >> so I still appear not to be seeing timer processing at the correct event >> time. If there was no end of stream, I would never get any timeouts. >> >> Below is the output I get when I run. This output is correct but: >> a) only because I am manually cancelling timers in DPTimeoutFunction >> (search for "!!!") >> b) the timer events are timestamped correctly, but are not emitted into >> the stream at the right time - and if the stream didn't end then no >> timeouts would ever occur (which in particular means that devices that >> never come back online will never get marked as offline). >> >> Perhaps I do need to implement an onPeriodicEmit function? Does that >> require a customer watermark strategy? I can see how to define a custom >> watermark at link below, but unclear how to install that? >> >> https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy >> >> {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} >> {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} >> {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} >> {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 >> msg_in.ts 1000 Cancelling previous timer. "} >> {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 >> msg_in.ts 1000 Cancelling previous timer. "} >> {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 >> msg_in.ts 2000 Cancelling previous timer. "} >> {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 >> msg_in.ts 2000 Cancelling previous timer. "} >> {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 >> msg_in.ts 3000 Cancelling previous timer. "} >> {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 >> msg_in.ts 3000 Cancelling previous timer. "} >> {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 >> msg_in.ts 4000 Cancelling previous timer. "} >> {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 >> msg_in.ts 4000 Cancelling previous timer. "} >> {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 >> msg_in.ts 5000 Cancelling previous timer. "} >> {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 >> msg_in.ts 5000 "} >> {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 >> msg_in.ts 6000 Cancelling previous timer. "} >> {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 >> msg_in.ts 6000 Cancelling previous timer. "} >> {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 >> msg_in.ts 7000 Cancelling previous timer. "} >> {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 >> msg_in.ts 7000 "} >> {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 >> msg_in.ts 7000 Cancelling previous timer. "} >> {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 >> msg_in.ts 8000 Cancelling previous timer. "} >> {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 >> msg_in.ts 8000 Cancelling previous timer. "} >> {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 >> msg_in.ts 8000 Cancelling previous timer. "} >> {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 >> msg_in.ts 9000 Cancelling previous timer. "} >> {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 >> msg_in.ts 9000 Cancelling previous timer. "} >> {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 >> msg_in.ts 9000 Cancelling previous timer. "} >> {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 >> msg_in.ts 10000 Cancelling previous timer. "} >> {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 >> msg_in.ts 10000 Cancelling previous timer. "} >> {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 >> msg_in.ts 10000 Cancelling previous timer. "} >> {"ts":1001,"id":"2","is_online":false} // These are the "going offline" >> events that we want to see. But they are emitted only once the stream has >> ended. >> {"ts":5001,"id":"1","is_online":false} >> {"ts":11001,"id":"1","is_online":false} >> {"ts":11001,"id":"0","is_online":false} >> {"ts":11001,"id":"2","is_online":false} >> >> Thanks, >> >> -Pilgrim >> -- >> Learn more at https://devicepilot.com @devicepilot >> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> >> +44 7961 125282 >> See our latest features >> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> >> and book me >> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> >> for >> a video call. >> >> >> >> On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> You were right that it is an issue with the watermarks; outside of the >>> when the job was stopped they were never emitted downstream, so no timer >>> was ever triggered. >>> >>> It appears that you need to set the setAutoWatermarkInterval in the >>> ExecutionConfig via >>> >>> >>> env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis()); >>> >>> >>> to have them periodically emitted. Alternatively you could override >>> BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for >>> event (for example, by calling #onPeriodicEmit). >>> >>> Put another way, if you use any of the built-in WatermarkGenerators and >>> use event-time, then it appears that you *must* set this interval. >>> >>> This behavior is...less than ideal I must admit, and it does not appear >>> to be properly documented. >>> >>> On 1/27/2021 1:56 PM, Chesnay Schepler wrote: >>> >>> Based on your description you aren't doing anything obviously wrong. >>> Would it be possible for you to share the code with us? >>> >>> On 1/27/2021 1:02 PM, Pilgrim Beart wrote: >>> >>> A newbie question: >>> >>> I've created a basic Flink DataStream job for an IoT use-case, with file >>> source and sink for testing. >>> I key by device ID, then in a ProcessFunction set an EventTime Timer to >>> fire if a device falls silent, i.e. a timeout, which I cancel if another >>> message arrives from that device within the timeout. >>> >>> My test source generates 3 devices, one of which falls silent for more >>> than the timeout period during the stream, then resumes again. So I expect >>> the Timer to fire for that device during the stream, and then for all the >>> Timers to fire after the end of the stream. >>> >>> The timers do indeed fire at the end of the stream (e.g. with a timeout >>> of 1000, the timers all fire 1000 after the end of the stream, which is >>> correct). But no timer fires for the device which falls silent during the >>> stream (even though other devices are still talking, advancing event time). >>> I've verified that I am keying correctly by ID. >>> >>> I suspect this is something to do with Watermarks. I'm using >>> forBoundedOutOfOrderness watermarking with a duration of 0. >>> >>> All suggestions welcome, thanks. >>> >>> -Pilgrim >>> -- >>> Learn more at https://devicepilot.com @devicepilot >>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> >>> +44 7961 125282 >>> See our latest features >>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> >>> and book me >>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> >>> for >>> a video call. >>> >>> >>> >>> >> >> >>