Hello,

I tried setting the watermark to System.currentTimeMillis() - 5000L, event
timestamps are System.currentTimeMillis(). I do not observe the expected
behaviour of the PatternTimeoutFunction firing once the watermark moves
past the timeout "anchored" by a pattern match.

Here is the complete test class source <http://pastebin.com/9WxGq2wv>, in
case someone is interested. The timestamp/watermark assigner looks like
this:

DataStream<Event> withTimestampsAndWatermarks = tuples
        .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<Event>() {

            long waterMarkTmst;

            @Override
            public long extractTimestamp(Event element, long
previousElementTimestamp) {
                return element.tmst;
            }

            @Override
            public Watermark getCurrentWatermark() {
                waterMarkTmst = System.currentTimeMillis() - 5000L;
                System.out.println(String.format("Watermark at %s", new
Date(waterMarkTmst)));
                return new Watermark(waterMarkTmst);
            }
        }).keyBy("key");

withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(1000L);

// Apply pattern filtering on stream.
PatternStream<Event> patternStream =
CEP.pattern(withTimestampsAndWatermarks, pattern);

Any idea what's wrong?

David


On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote:

> Assuming an element with timestamp which is later than the last emitted
> watermark arrives, would it just be dropped because the PatternStream does
> not have a max allowed lateness method? In that case it appears that CEP
> cannot handle late events yet out of the box.
>
> If we do want to support late events can we chain a keyBy().timeWindow().
> allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() again
> before handing it to the CEP operator. This way we may have the patterns
> fired multiple times but it allows an event to be late and out of order. It
> looks like it will work but is there a less convoluted way.
>
> Thanks,
> Sameer
>
> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com>
> wrote:
>
>> But then no element later than the last emitted watermark must be issued
>> by the sources. If that is the case, then this solution should work.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote:
>>
>>> Hi,
>>>
>>> If you know that the events are arriving in order and a consistent lag,
>>> why not just increment the watermark time every time the
>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>> (or less to be conservative).
>>>
>>> You can check if the watermark has changed since the arrival of the last
>>> event and if not increment it in the getCurrentWatermark() method.
>>> Otherwise the watermark will never increase until an element arrive and if
>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>
>>> Sameer
>>>
>>>
>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> the problem is still that there is no corresponding watermark saying
>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>> periodically emitted but the same watermark will be emitted until a new
>>>> element arrives which will reset the watermark. Thus, the system can never
>>>> know until this watermark is seen whether there will be an earlier event or
>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>
>>>> You're right that the negation operator won't solve the problem. It
>>>> will indeed suffer from the same problem.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>
>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>> "not" operator) does not address this because again, how would the "not
>>>>> match" be triggered if no event at all occurs?
>>>>>
>>>>> Good question.
>>>>>
>>>>> I'm not sure whether the following will work:
>>>>>
>>>>> This could be done by creating a CEP matching pattern that uses both
>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like
>>>>> this:
>>>>>
>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>     .notNext("second")
>>>>>     .within(Time.seconds(3));
>>>>>
>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>
>>>>> Note: I have requested these negation patterns to be implemented in
>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>
>>>>>
>>>>> - LF
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* David Koch <ogd...@googlemail.com>
>>>>> *To:* user@flink.apache.org; lg...@yahoo.com
>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Hello,
>>>>>
>>>>> Thank you for the explanation as well as the link to the other post.
>>>>> Interesting to learn about some of the open JIRAs.
>>>>>
>>>>> Indeed, I was not using event time, but processing time. However, even
>>>>> when using event time I only get notified of timeouts upon subsequent
>>>>> events.
>>>>>
>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>> instances no further than 3 seconds apart using CEP.
>>>>>
>>>>> Apart from the fact that results are only printed when I close the
>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>
>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>> timeout to be triggered.
>>>>>
>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>> "not" operator) does not address this because again, how would the "not
>>>>> match" be triggered if no event at all occurs?
>>>>>
>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>
>>>>> The following is a better link:
>>>>>
>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>> 40mail.gmail.com%3E
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>
>>>>>
>>>>> - LF
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com>
>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>>> this issue?
>>>>>
>>>>> See this discussion thread:
>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>> 9Fg%40mail.gmail.com%3E
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>
>>>>>
>>>>>
>>>>> //  Atul
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Till Rohrmann <trohrm...@apache.org>
>>>>> *To:* user@flink.apache.org
>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Hi David,
>>>>>
>>>>> in case of event time, the timeout will be detected when the first
>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>> event).
>>>>>
>>>>> In case of processing time, the time is only updated whenever a new
>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>> Event A, it should detect the timeout. If the next event arrives 20 
>>>>> seconds
>>>>> later, than you won't see the timeout until then.
>>>>>
>>>>> In the case of processing time, we could think about registering
>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>> meaningful computations, because the events might arrive out of order.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>>> that time out? I am under the impression that this is not possible.
>>>>>
>>>>> In my case I partition a stream containing user web navigation by
>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>> for each user.
>>>>>
>>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>>> fires upon the first event after the specified timeout. For example, given
>>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>>
>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>> interval expiring since passive invalidation is not really applicable in 
>>>>> my
>>>>> case.
>>>>>
>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> David
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to