Hello, I tried setting the watermark to System.currentTimeMillis() - 5000L, event timestamps are System.currentTimeMillis(). I do not observe the expected behaviour of the PatternTimeoutFunction firing once the watermark moves past the timeout "anchored" by a pattern match.
Here is the complete test class source <http://pastebin.com/9WxGq2wv>, in case someone is interested. The timestamp/watermark assigner looks like this: DataStream<Event> withTimestampsAndWatermarks = tuples .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>() { long waterMarkTmst; @Override public long extractTimestamp(Event element, long previousElementTimestamp) { return element.tmst; } @Override public Watermark getCurrentWatermark() { waterMarkTmst = System.currentTimeMillis() - 5000L; System.out.println(String.format("Watermark at %s", new Date(waterMarkTmst))); return new Watermark(waterMarkTmst); } }).keyBy("key"); withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(1000L); // Apply pattern filtering on stream. PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks, pattern); Any idea what's wrong? David On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote: > Assuming an element with timestamp which is later than the last emitted > watermark arrives, would it just be dropped because the PatternStream does > not have a max allowed lateness method? In that case it appears that CEP > cannot handle late events yet out of the box. > > If we do want to support late events can we chain a keyBy().timeWindow(). > allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() again > before handing it to the CEP operator. This way we may have the patterns > fired multiple times but it allows an event to be late and out of order. It > looks like it will work but is there a less convoluted way. > > Thanks, > Sameer > > On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com> > wrote: > >> But then no element later than the last emitted watermark must be issued >> by the sources. If that is the case, then this solution should work. >> >> Cheers, >> Till >> >> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote: >> >>> Hi, >>> >>> If you know that the events are arriving in order and a consistent lag, >>> why not just increment the watermark time every time the >>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >>> (or less to be conservative). >>> >>> You can check if the watermark has changed since the arrival of the last >>> event and if not increment it in the getCurrentWatermark() method. >>> Otherwise the watermark will never increase until an element arrive and if >>> the stream partition stalls for some reason the whole pipeline freezes. >>> >>> Sameer >>> >>> >>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com> >>> wrote: >>> >>>> Hi David, >>>> >>>> the problem is still that there is no corresponding watermark saying >>>> that 4 seconds have now passed. With your code, watermarks will be >>>> periodically emitted but the same watermark will be emitted until a new >>>> element arrives which will reset the watermark. Thus, the system can never >>>> know until this watermark is seen whether there will be an earlier event or >>>> not. I fear that this is a fundamental problem with stream processing. >>>> >>>> You're right that the negation operator won't solve the problem. It >>>> will indeed suffer from the same problem. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote: >>>> >>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>> "not" operator) does not address this because again, how would the "not >>>>> match" be triggered if no event at all occurs? >>>>> >>>>> Good question. >>>>> >>>>> I'm not sure whether the following will work: >>>>> >>>>> This could be done by creating a CEP matching pattern that uses both >>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like >>>>> this: >>>>> >>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first") >>>>> .notNext("second") >>>>> .within(Time.seconds(3)); >>>>> >>>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>>> >>>>> Note: I have requested these negation patterns to be implemented in >>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink.. >>>>> >>>>> >>>>> - LF >>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* David Koch <ogd...@googlemail.com> >>>>> *To:* user@flink.apache.org; lg...@yahoo.com >>>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>>> >>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>> >>>>> Hello, >>>>> >>>>> Thank you for the explanation as well as the link to the other post. >>>>> Interesting to learn about some of the open JIRAs. >>>>> >>>>> Indeed, I was not using event time, but processing time. However, even >>>>> when using event time I only get notified of timeouts upon subsequent >>>>> events. >>>>> >>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I >>>>> read <key> <value> from a socket, wrap this in a custom "event" with >>>>> timestamp, key the resultant stream by <key> and attempt to detect <key> >>>>> instances no further than 3 seconds apart using CEP. >>>>> >>>>> Apart from the fact that results are only printed when I close the >>>>> socket (normal?) I don't observe any change in behaviour >>>>> >>>>> So event-time/watermarks or not: SOME event has to occur for the >>>>> timeout to be triggered. >>>>> >>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>> "not" operator) does not address this because again, how would the "not >>>>> match" be triggered if no event at all occurs? >>>>> >>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote: >>>>> >>>>> The following is a better link: >>>>> >>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g% >>>>> 40mail.gmail.com%3E >>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E> >>>>> >>>>> >>>>> - LF >>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com> >>>>> *To:* "user@flink.apache.org" <user@flink.apache.org> >>>>> *Sent:* Friday, October 7, 2016 3:36 PM >>>>> >>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>> >>>>> Isn't the upcoming CEP negation (absence of an event) feature solve >>>>> this issue? >>>>> >>>>> See this discussion thread: >>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX >>>>> 9Fg%40mail.gmail.com%3E >>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E> >>>>> >>>>> >>>>> >>>>> // Atul >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* Till Rohrmann <trohrm...@apache.org> >>>>> *To:* user@flink.apache.org >>>>> *Sent:* Friday, October 7, 2016 12:58 AM >>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>> >>>>> Hi David, >>>>> >>>>> in case of event time, the timeout will be detected when the first >>>>> watermark exceeding the timeout value is received. Thus, it depends a >>>>> little bit how you generate watermarks (e.g. periodically, watermark per >>>>> event). >>>>> >>>>> In case of processing time, the time is only updated whenever a new >>>>> element arrives. Thus, if you have an element arriving 4 seconds after >>>>> Event A, it should detect the timeout. If the next event arrives 20 >>>>> seconds >>>>> later, than you won't see the timeout until then. >>>>> >>>>> In the case of processing time, we could think about registering >>>>> timeout timers for processing time. However, I would highly recommend you >>>>> to use event time, because with processing time, Flink cannot guarantee >>>>> meaningful computations, because the events might arrive out of order. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com> >>>>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> With Flink CEP, is there a way to actively listen to pattern matches >>>>> that time out? I am under the impression that this is not possible. >>>>> >>>>> In my case I partition a stream containing user web navigation by >>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds >>>>> for each user. >>>>> >>>>> I registered a PatternTimeoutFunction which assuming a non-match only >>>>> fires upon the first event after the specified timeout. For example, given >>>>> user X: Event A, 20 seconds later Event B (or any other type of event). >>>>> >>>>> I'd rather have a notification fire directly upon the 4 second >>>>> interval expiring since passive invalidation is not really applicable in >>>>> my >>>>> case. >>>>> >>>>> How, if at all can this be achieved with Flink CEP? >>>>> >>>>> Thanks, >>>>> >>>>> David >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >