I'm not sure I see the problem in your output.

For any given key the timestamps are in order, and the events where devices are offline seem to occur at the right time.

Is it just that you'd like the following line to occur earlier in the output?

{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}

If so, then I'd just partition the output by key and evaluate them individually.

On 1/28/2021 9:53 AM, Pilgrim Beart wrote:
Scratch that - your WatermarkStrategy DOES work (when I implement it correctly!). Well, almost: As you can see below (code pushed to repo), the Timer events are still appearing somewhat late in the stream - 4 events late in this case. It may be just good-enough for my purposes, though it will make building test cases painful, so if you have any ideas how I could fix that, would be much appreciated.

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "}
{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 5000 Cancelled previous timer. "} {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 5000 Cancelled previous timer. "} {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "}
{"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
-Pilgrim
--
Learn more at https://devicepilot.com <https://devicepilot.com> @devicepilot <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>  +44 7961 125282 See our latest features <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33> and book me <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33> for a video call.



On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart <pilgrim.be...@devicepilot.com <mailto:pilgrim.be...@devicepilot.com>> wrote:

    Chesnay,
    I cannot reproduce this - I've tried the approaches you suggest,
    but nothing I've done makes the timers fire at the correct time in
    the stream - they only fire when the stream has ended. If you have
    an EventTime example where they fire at the right time in the
    stream, I'd love to see it. Or any ideas for other things to try?
    Could it perhaps be related to using a file source?

    Thanks,

    -Pilgrim
    --
    Learn more at https://devicepilot.com <https://devicepilot.com>
    @devicepilot
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
     +44 7961 125282
    See our latest features
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
    and book me
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
 for
    a video call.



    On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler <ches...@apache.org
    <mailto:ches...@apache.org>> wrote:

        Actually, if the parallelism is 1 then it works as it should.
        sigh....

        On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
        Note that while this does fix the issue of timers not firing
        while the job is running, it seems to be firing too many
        timers...

        On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
        My bad, I was still using the custom WatermarkStrategy that
        emits a watermark for each event.
        .assignTimestampsAndWatermarks( new WatermarkStrategy<T>() {
        @Override public WatermarkGenerator<T>
        createWatermarkGenerator(WatermarkGeneratorSupplier.Context
        context) { return new
        BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1)) {
        @Override public void onEvent(Tevent, long eventTimestamp,
        WatermarkOutput output) { super.onEvent(event,
        eventTimestamp, output); super.onPeriodicEmit(output); } }; } } 
.withTimestampAssigner(...)

        @Aljoscha This is about Flink 1.11. Since the periodic
        watermarks are dependent on processing time, am I assuming
        correctly if the job finishes quickly that watermarks may
        never be emitted (except for those at the job)? Is there any
        way to emit periodic watermarks based on event time?
        Is there any way to enable punctuated watermarks for the
        existing watermark strategies without having to implement a
        custom one?

        On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
        Chesnay,
        Thanks for this - I've made the change you suggested
        (setAutoWatermarkInterval) but it hasn't changed the
        behaviour - timers still get processed only on stream end.
        I have pushed a new version, with this change, and also
        emitting some information in a .log field.
        If you search for "!!!" in Ingest.java
        and DPTimeoutFunction.java you'll see the relevant changes.

        In DPTimeoutFunction you'll see that if I add code to say
        "cancel the timer only if it wouldn't have gone off" then
        the output is now correct - individual devices do timeout.
        However, this output only appears at the end of the stream
        (i.e. time jumps backwards as all the timers are processed)
        so I still appear not to be seeing timer processing at the
        correct event time. If there was no end of stream, I would
        never get any timeouts.

        Below is the output I get when I run. This output is
        correct but:
        a) only because I am manually cancelling timers in
        DPTimeoutFunction (search for "!!!")
        b) the timer events are timestamped correctly, but are not
        emitted into the stream at the right time - and if the
        stream didn't end then no timeouts would ever occur (which
        in particular means that devices that never come back
        online will never get marked as offline).

        Perhaps I do need to implement an onPeriodicEmit function?
        Does that require a customer watermark strategy? I can see
        how to define a custom watermark at link below, but unclear
        how to install that?
        
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
        
<https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy>

        {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new
        state. "}
        {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new
        state. "}
        {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new
        state. "}
        {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts
        0 msg_in.ts 1000 Cancelling previous timer. "}
        {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts
        0 msg_in.ts 1000 Cancelling previous timer. "}
        {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts
        1000 msg_in.ts 2000 Cancelling previous timer. "}
        {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts
        1000 msg_in.ts 2000 Cancelling previous timer. "}
        {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts
        2000 msg_in.ts 3000 Cancelling previous timer. "}
        {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts
        2000 msg_in.ts 3000 Cancelling previous timer. "}
        {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts
        3000 msg_in.ts 4000 Cancelling previous timer. "}
        {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts
        3000 msg_in.ts 4000 Cancelling previous timer. "}
        {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts
        4000 msg_in.ts 5000 Cancelling previous timer. "}
        {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts
        0 msg_in.ts 5000 "}
        {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts
        5000 msg_in.ts 6000 Cancelling previous timer. "}
        {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts
        5000 msg_in.ts 6000 Cancelling previous timer. "}
        {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts
        6000 msg_in.ts 7000 Cancelling previous timer. "}
        {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts
        4000 msg_in.ts 7000 "}
        {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts
        6000 msg_in.ts 7000 Cancelling previous timer. "}
        {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts
        7000 msg_in.ts 8000 Cancelling previous timer. "}
        {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts
        7000 msg_in.ts 8000 Cancelling previous timer. "}
        {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts
        7000 msg_in.ts 8000 Cancelling previous timer. "}
        {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts
        8000 msg_in.ts 9000 Cancelling previous timer. "}
        {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts
        8000 msg_in.ts 9000 Cancelling previous timer. "}
        {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts
        8000 msg_in.ts 9000 Cancelling previous timer. "}
        {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts
        9000 msg_in.ts 10000 Cancelling previous timer. "}
        {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts
        9000 msg_in.ts 10000 Cancelling previous timer. "}
        {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts
        9000 msg_in.ts 10000 Cancelling previous timer. "}
        {"ts":1001,"id":"2","is_online":false} // These are the
        "going offline" events that we want to see. But they are
        emitted only once the stream has ended.
        {"ts":5001,"id":"1","is_online":false}
        {"ts":11001,"id":"1","is_online":false}
        {"ts":11001,"id":"0","is_online":false}
        {"ts":11001,"id":"2","is_online":false}

        Thanks,

        -Pilgrim
        --
        Learn more at https://devicepilot.com
        <https://devicepilot.com> @devicepilot
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
         +44 7961 125282
        See our latest features
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
        and book me
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
 for
        a video call.



        On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            ||
            You were right that it is an issue with the watermarks;
            outside of the when the job was stopped they were never
            emitted downstream, so no timer was ever triggered.

            It appears that you need to set the
            setAutoWatermarkInterval in the ExecutionConfig via

            
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());


            to have them periodically emitted. Alternatively you
            could override BoundedOutOfOrdernessWatermarks#onEvent
            to also emit a watermark for event (for example, by
            calling #onPeriodicEmit).

            Put another way, if you use any of the built-in
            WatermarkGenerators and use event-time, then it appears
            that you *must* set this interval.

            This behavior is...less than ideal I must admit, and it
            does not appear to be properly documented.

            On 1/27/2021 1:56 PM, Chesnay Schepler wrote:

            Based on your description you aren't doing anything
            obviously wrong.

            Would it be possible for you to share the code with us?

            On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
            A newbie question:

            I've created a basic Flink DataStream job for an IoT
            use-case, with file source and sink for testing.
            I key by device ID, then in a ProcessFunction set an
            EventTime Timer to fire if a device falls silent,
            i.e. a timeout, which I cancel if another message
            arrives from that device within the timeout.

            My test source generates 3 devices, one of which
            falls silent for more than the timeout period during
            the stream, then resumes again. So I expect the Timer
            to fire for that device during the stream, and then
            for all the Timers to fire after the end of the stream.

            The timers do indeed fire at the end of the stream
            (e.g. with a timeout of 1000, the timers all fire
            1000 after the end of the stream, which is correct).
            But no timer fires for the device which falls silent
            during the stream (even though other devices are
            still talking, advancing event time). I've verified
            that I am keying correctly by ID.

            I suspect this is something to do with Watermarks.
            I'm using forBoundedOutOfOrderness watermarking with
            a duration of 0.

            All suggestions welcome, thanks.

            -Pilgrim
            --
            Learn more at https://devicepilot.com
            <https://devicepilot.com> @devicepilot
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
             +44 7961 125282
            See our latest features
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
            and book me
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
 for
            a video call.







Reply via email to