Chesnay,
I cannot reproduce this - I've tried the approaches you suggest, but
nothing I've done makes the timers fire at the correct time in the stream -
they only fire when the stream has ended. If you have an EventTime example
where they fire at the right time in the stream, I'd love to see it. Or any
ideas for other things to try? Could it perhaps be related to using a file
source?

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
 +44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
for
a video call.



On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler <ches...@apache.org> wrote:

> Actually, if the parallelism is 1 then it works as it should. sigh....
>
> On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
>
> Note that while this does fix the issue of timers not firing while the job
> is running, it seems to be firing too many timers...
>
> On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
>
> My bad, I was still using the custom WatermarkStrategy that emits a
> watermark for each event.
>
> .assignTimestampsAndWatermarks(
>     new WatermarkStrategy<T>() {
>         @Override
>         public WatermarkGenerator<T> 
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>             return new 
> BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1)) {
>                 @Override
>                 public void onEvent(Tevent, long eventTimestamp, 
> WatermarkOutput output) {
>                     super.onEvent(event, eventTimestamp, output);
>                     super.onPeriodicEmit(output);
>                 }
>             };
>         }
>     }.withTimestampAssigner(...)
>
>
> @Aljoscha This is about Flink 1.11. Since the periodic watermarks are
> dependent on processing time, am I assuming correctly if the job finishes
> quickly that watermarks may never be emitted (except for those at the job)?
> Is there any way to emit periodic watermarks based on event time?
> Is there any way to enable punctuated watermarks for the existing
> watermark strategies without having to implement a custom one?
>
> On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
>
> Chesnay,
> Thanks for this - I've made the change you suggested
> (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
> still get processed only on stream end.
> I have pushed a new version, with this change, and also emitting some
> information in a .log field.
> If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
> see the relevant changes.
>
> In DPTimeoutFunction you'll see that if I add code to say "cancel the
> timer only if it wouldn't have gone off" then the output is now correct -
> individual devices do timeout. However, this output only appears at the end
> of the stream (i.e. time jumps backwards as all the timers are processed)
> so I still appear not to be seeing timer processing at the correct event
> time. If there was no end of stream, I would never get any timeouts.
>
> Below is the output I get when I run. This output is correct but:
> a) only because I am manually cancelling timers in DPTimeoutFunction
> (search for "!!!")
> b) the timer events are timestamped correctly, but are not emitted into
> the stream at the right time - and if the stream didn't end then no
> timeouts would ever occur (which in particular means that devices that
> never come back online will never get marked as offline).
>
> Perhaps I do need to implement an onPeriodicEmit function? Does that
> require a customer watermark strategy? I can see how to define a custom
> watermark at link below, but unclear how to install that?
>
> https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
>
> {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
> {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
> {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
> {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
> msg_in.ts 1000 Cancelling previous timer. "}
> {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
> msg_in.ts 1000 Cancelling previous timer. "}
> {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
> msg_in.ts 2000 Cancelling previous timer. "}
> {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
> msg_in.ts 2000 Cancelling previous timer. "}
> {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
> msg_in.ts 3000 Cancelling previous timer. "}
> {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
> msg_in.ts 3000 Cancelling previous timer. "}
> {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
> msg_in.ts 4000 Cancelling previous timer. "}
> {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
> msg_in.ts 4000 Cancelling previous timer. "}
> {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
> msg_in.ts 5000 Cancelling previous timer. "}
> {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
> msg_in.ts 5000 "}
> {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
> msg_in.ts 6000 Cancelling previous timer. "}
> {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
> msg_in.ts 6000 Cancelling previous timer. "}
> {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
> msg_in.ts 7000 Cancelling previous timer. "}
> {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
> msg_in.ts 7000 "}
> {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
> msg_in.ts 7000 Cancelling previous timer. "}
> {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
> msg_in.ts 8000 Cancelling previous timer. "}
> {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
> msg_in.ts 8000 Cancelling previous timer. "}
> {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
> msg_in.ts 8000 Cancelling previous timer. "}
> {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
> msg_in.ts 9000 Cancelling previous timer. "}
> {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
> msg_in.ts 9000 Cancelling previous timer. "}
> {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
> msg_in.ts 9000 Cancelling previous timer. "}
> {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
> msg_in.ts 10000 Cancelling previous timer. "}
> {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
> msg_in.ts 10000 Cancelling previous timer. "}
> {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
> msg_in.ts 10000 Cancelling previous timer. "}
> {"ts":1001,"id":"2","is_online":false} // These are the "going offline"
> events that we want to see. But they are emitted only once the stream has
> ended.
> {"ts":5001,"id":"1","is_online":false}
> {"ts":11001,"id":"1","is_online":false}
> {"ts":11001,"id":"0","is_online":false}
> {"ts":11001,"id":"2","is_online":false}
>
> Thanks,
>
> -Pilgrim
> --
> Learn more at https://devicepilot.com @devicepilot
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
>  +44 7961 125282
> See our latest features
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
> and book me
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
>  for
> a video call.
>
>
>
> On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <ches...@apache.org> wrote:
>
>> You were right that it is an issue with the watermarks; outside of the
>> when the job was stopped they were never emitted downstream, so no timer
>> was ever triggered.
>>
>> It appears that you need to set the setAutoWatermarkInterval in the
>> ExecutionConfig via
>>
>>
>> env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());
>>
>>
>> to have them periodically emitted. Alternatively you could override
>> BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for
>> event (for example, by calling #onPeriodicEmit).
>>
>> Put another way, if you use any of the built-in WatermarkGenerators and
>> use event-time, then it appears that you *must* set this interval.
>>
>> This behavior is...less than ideal I must admit, and it does not appear
>> to be properly documented.
>>
>> On 1/27/2021 1:56 PM, Chesnay Schepler wrote:
>>
>> Based on your description you aren't doing anything obviously wrong.
>> Would it be possible for you to share the code with us?
>>
>> On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
>>
>> A newbie question:
>>
>> I've created a basic Flink DataStream job for an IoT use-case, with file
>> source and sink for testing.
>> I key by device ID, then in a ProcessFunction set an EventTime Timer to
>> fire if a device falls silent, i.e. a timeout, which I cancel if another
>> message arrives from that device within the timeout.
>>
>> My test source generates 3 devices, one of which falls silent for more
>> than the timeout period during the stream, then resumes again. So I expect
>> the Timer to fire for that device during the stream, and then for all the
>> Timers to fire after the end of the stream.
>>
>> The timers do indeed fire at the end of the stream (e.g. with a timeout
>> of 1000, the timers all fire 1000 after the end of the stream, which is
>> correct). But no timer fires for the device which falls silent during the
>> stream (even though other devices are still talking, advancing event time).
>> I've verified that I am keying correctly by ID.
>>
>> I suspect this is something to do with Watermarks. I'm using
>> forBoundedOutOfOrderness watermarking with a duration of 0.
>>
>> All suggestions welcome, thanks.
>>
>> -Pilgrim
>> --
>> Learn more at https://devicepilot.com @devicepilot
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>>  +44 7961 125282
>> See our latest features
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>> and book me
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>>  for
>> a video call.
>>
>>
>>
>>
>
>
>

Reply via email to