Scratch that - your WatermarkStrategy DOES work (when I implement it
correctly!).
Well, almost: As you can see below (code pushed to repo), the Timer events
are still appearing somewhat late in the stream - 4 events late in this
case. It may be just good-enough for my purposes, though it will make
building test cases painful, so if you have any ideas how I could fix that,
would be much appreciated.

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelled previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelled previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 5000 Cancelled previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 5000 Cancelled previous timer. "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelled previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelled previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelled previous timer. "}
{"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelled previous timer. "}
{"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelled previous timer. "}
{"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
 +44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
for
a video call.



On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart <pilgrim.be...@devicepilot.com>
wrote:

> Chesnay,
> I cannot reproduce this - I've tried the approaches you suggest, but
> nothing I've done makes the timers fire at the correct time in the stream -
> they only fire when the stream has ended. If you have an EventTime example
> where they fire at the right time in the stream, I'd love to see it. Or any
> ideas for other things to try? Could it perhaps be related to using a file
> source?
>
> Thanks,
>
> -Pilgrim
> --
> Learn more at https://devicepilot.com @devicepilot
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
>  +44 7961 125282
> See our latest features
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
> and book me
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
>  for
> a video call.
>
>
>
> On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler <ches...@apache.org> wrote:
>
>> Actually, if the parallelism is 1 then it works as it should. sigh....
>>
>> On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
>>
>> Note that while this does fix the issue of timers not firing while the
>> job is running, it seems to be firing too many timers...
>>
>> On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
>>
>> My bad, I was still using the custom WatermarkStrategy that emits a
>> watermark for each event.
>>
>> .assignTimestampsAndWatermarks(
>>     new WatermarkStrategy<T>() {
>>         @Override
>>         public WatermarkGenerator<T> 
>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>             return new 
>> BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1)) {
>>                 @Override
>>                 public void onEvent(Tevent, long eventTimestamp, 
>> WatermarkOutput output) {
>>                     super.onEvent(event, eventTimestamp, output);
>>                     super.onPeriodicEmit(output);
>>                 }
>>             };
>>         }
>>     }.withTimestampAssigner(...)
>>
>>
>> @Aljoscha This is about Flink 1.11. Since the periodic watermarks are
>> dependent on processing time, am I assuming correctly if the job finishes
>> quickly that watermarks may never be emitted (except for those at the job)?
>> Is there any way to emit periodic watermarks based on event time?
>> Is there any way to enable punctuated watermarks for the existing
>> watermark strategies without having to implement a custom one?
>>
>> On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
>>
>> Chesnay,
>> Thanks for this - I've made the change you suggested
>> (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
>> still get processed only on stream end.
>> I have pushed a new version, with this change, and also emitting some
>> information in a .log field.
>> If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
>> see the relevant changes.
>>
>> In DPTimeoutFunction you'll see that if I add code to say "cancel the
>> timer only if it wouldn't have gone off" then the output is now correct -
>> individual devices do timeout. However, this output only appears at the end
>> of the stream (i.e. time jumps backwards as all the timers are processed)
>> so I still appear not to be seeing timer processing at the correct event
>> time. If there was no end of stream, I would never get any timeouts.
>>
>> Below is the output I get when I run. This output is correct but:
>> a) only because I am manually cancelling timers in DPTimeoutFunction
>> (search for "!!!")
>> b) the timer events are timestamped correctly, but are not emitted into
>> the stream at the right time - and if the stream didn't end then no
>> timeouts would ever occur (which in particular means that devices that
>> never come back online will never get marked as offline).
>>
>> Perhaps I do need to implement an onPeriodicEmit function? Does that
>> require a customer watermark strategy? I can see how to define a custom
>> watermark at link below, but unclear how to install that?
>>
>> https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
>>
>> {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
>> {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
>> {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
>> {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
>> msg_in.ts 1000 Cancelling previous timer. "}
>> {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
>> msg_in.ts 1000 Cancelling previous timer. "}
>> {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
>> msg_in.ts 2000 Cancelling previous timer. "}
>> {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
>> msg_in.ts 2000 Cancelling previous timer. "}
>> {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
>> msg_in.ts 3000 Cancelling previous timer. "}
>> {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
>> msg_in.ts 3000 Cancelling previous timer. "}
>> {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
>> msg_in.ts 4000 Cancelling previous timer. "}
>> {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
>> msg_in.ts 4000 Cancelling previous timer. "}
>> {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
>> msg_in.ts 5000 Cancelling previous timer. "}
>> {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
>> msg_in.ts 5000 "}
>> {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
>> msg_in.ts 6000 Cancelling previous timer. "}
>> {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
>> msg_in.ts 6000 Cancelling previous timer. "}
>> {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
>> msg_in.ts 7000 Cancelling previous timer. "}
>> {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
>> msg_in.ts 7000 "}
>> {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
>> msg_in.ts 7000 Cancelling previous timer. "}
>> {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
>> msg_in.ts 8000 Cancelling previous timer. "}
>> {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
>> msg_in.ts 8000 Cancelling previous timer. "}
>> {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
>> msg_in.ts 8000 Cancelling previous timer. "}
>> {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
>> msg_in.ts 9000 Cancelling previous timer. "}
>> {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
>> msg_in.ts 9000 Cancelling previous timer. "}
>> {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
>> msg_in.ts 9000 Cancelling previous timer. "}
>> {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
>> msg_in.ts 10000 Cancelling previous timer. "}
>> {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
>> msg_in.ts 10000 Cancelling previous timer. "}
>> {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
>> msg_in.ts 10000 Cancelling previous timer. "}
>> {"ts":1001,"id":"2","is_online":false} // These are the "going offline"
>> events that we want to see. But they are emitted only once the stream has
>> ended.
>> {"ts":5001,"id":"1","is_online":false}
>> {"ts":11001,"id":"1","is_online":false}
>> {"ts":11001,"id":"0","is_online":false}
>> {"ts":11001,"id":"2","is_online":false}
>>
>> Thanks,
>>
>> -Pilgrim
>> --
>> Learn more at https://devicepilot.com @devicepilot
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
>>  +44 7961 125282
>> See our latest features
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
>> and book me
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
>>  for
>> a video call.
>>
>>
>>
>> On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> You were right that it is an issue with the watermarks; outside of the
>>> when the job was stopped they were never emitted downstream, so no timer
>>> was ever triggered.
>>>
>>> It appears that you need to set the setAutoWatermarkInterval in the
>>> ExecutionConfig via
>>>
>>>
>>> env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());
>>>
>>>
>>> to have them periodically emitted. Alternatively you could override
>>> BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for
>>> event (for example, by calling #onPeriodicEmit).
>>>
>>> Put another way, if you use any of the built-in WatermarkGenerators and
>>> use event-time, then it appears that you *must* set this interval.
>>>
>>> This behavior is...less than ideal I must admit, and it does not appear
>>> to be properly documented.
>>>
>>> On 1/27/2021 1:56 PM, Chesnay Schepler wrote:
>>>
>>> Based on your description you aren't doing anything obviously wrong.
>>> Would it be possible for you to share the code with us?
>>>
>>> On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
>>>
>>> A newbie question:
>>>
>>> I've created a basic Flink DataStream job for an IoT use-case, with file
>>> source and sink for testing.
>>> I key by device ID, then in a ProcessFunction set an EventTime Timer to
>>> fire if a device falls silent, i.e. a timeout, which I cancel if another
>>> message arrives from that device within the timeout.
>>>
>>> My test source generates 3 devices, one of which falls silent for more
>>> than the timeout period during the stream, then resumes again. So I expect
>>> the Timer to fire for that device during the stream, and then for all the
>>> Timers to fire after the end of the stream.
>>>
>>> The timers do indeed fire at the end of the stream (e.g. with a timeout
>>> of 1000, the timers all fire 1000 after the end of the stream, which is
>>> correct). But no timer fires for the device which falls silent during the
>>> stream (even though other devices are still talking, advancing event time).
>>> I've verified that I am keying correctly by ID.
>>>
>>> I suspect this is something to do with Watermarks. I'm using
>>> forBoundedOutOfOrderness watermarking with a duration of 0.
>>>
>>> All suggestions welcome, thanks.
>>>
>>> -Pilgrim
>>> --
>>> Learn more at https://devicepilot.com @devicepilot
>>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>>>  +44 7961 125282
>>> See our latest features
>>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>>> and book me
>>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>>>  for
>>> a video call.
>>>
>>>
>>>
>>>
>>
>>
>>

Reply via email to