Hi guys,

I'll try to come up with an example illustrating the behaviour over the
weekend.

Cheers,
Till

On Fri, Oct 14, 2016 at 11:16 AM, David Koch <ogd...@googlemail.com> wrote:

> Hello,
>
> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
> Compared to what I did the principle is the same - make sure that the
> watermark advances even without events present to trigger timeouts in CEP
> patterns.
>
> If Till or anyone else could provide a minimal example illustrating the
> supposed behaviour of:
>
> [CEP] timeout will be detected when the first watermark exceeding the
>> timeout value is received
>
>
> I'd very much appreciate it.
>
> Regards,
>
> David
>
>
> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sam...@axiomine.com> wrote:
>
>> Try this. Your WM's need to move forward. Also don't use System
>> Timestamp. Use the timestamp of the element seen as the reference as the
>> elements are most likely lagging the system timestamp.
>>
>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>         .assignTimestampsAndWatermarks(new 
>> AssignerWithPeriodicWatermarks<Event>()
>> {
>>
>>             long waterMarkTmst;
>>             long lastEmittedWM=0;
>>             @Override
>>             public long extractTimestamp(Event element, long
>> previousElementTimestamp) {
>>                 if(element.tmst>lastEmittedWM){
>>                    waterMarkTmst = element.tmst-1; //Assumes increasing
>> timestamps. Need to subtract 1 as more elements with same TS might arrive
>>                 }
>>                 return element.tmst;
>>             }
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>> move the WM forward by auto watermark interval
>>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
>> auto watermark interval (Watermarks only move forward in time)
>>                 }
>>                 lastEmittedWM = waterMarkTmst
>>
>>                 System.out.println(String.format("Watermark at %s", new
>> Date(waterMarkTmst)));
>>                 return new Watermark(waterMarkTmst);//Until an event is
>> seem WM==0 starts advancing by 1000ms until an event is seen
>>             }
>>         }).keyBy("key");
>>
>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogd...@googlemail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>> moves past the timeout "anchored" by a pattern match.
>>>
>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>>> in case someone is interested. The timestamp/watermark assigner looks like
>>> this:
>>>
>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>         .assignTimestampsAndWatermarks(new
>>> AssignerWithPeriodicWatermarks<Event>() {
>>>
>>>             long waterMarkTmst;
>>>
>>>             @Override
>>>             public long extractTimestamp(Event element, long
>>> previousElementTimestamp) {
>>>                 return element.tmst;
>>>             }
>>>
>>>             @Override
>>>             public Watermark getCurrentWatermark() {
>>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>>                 System.out.println(String.format("Watermark at %s", new
>>> Date(waterMarkTmst)));
>>>                 return new Watermark(waterMarkTmst);
>>>             }
>>>         }).keyBy("key");
>>>
>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>> rmarkInterval(1000L);
>>>
>>> // Apply pattern filtering on stream.
>>> PatternStream<Event> patternStream = 
>>> CEP.pattern(withTimestampsAndWatermarks,
>>> pattern);
>>>
>>> Any idea what's wrong?
>>>
>>> David
>>>
>>>
>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote:
>>>
>>>> Assuming an element with timestamp which is later than the last emitted
>>>> watermark arrives, would it just be dropped because the PatternStream does
>>>> not have a max allowed lateness method? In that case it appears that CEP
>>>> cannot handle late events yet out of the box.
>>>>
>>>> If we do want to support late events can we chain a
>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>>> again before handing it to the CEP operator. This way we may have the
>>>> patterns fired multiple times but it allows an event to be late and out of
>>>> order. It looks like it will work but is there a less convoluted way.
>>>>
>>>> Thanks,
>>>> Sameer
>>>>
>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
>>>> till.rohrm...@gmail.com> wrote:
>>>>
>>>>> But then no element later than the last emitted watermark must be
>>>>> issued by the sources. If that is the case, then this solution should 
>>>>> work.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> If you know that the events are arriving in order and a consistent
>>>>>> lag, why not just increment the watermark time every time the
>>>>>> getCurrentWatermark() method is invoked based on the 
>>>>>> autoWatermarkInterval
>>>>>> (or less to be conservative).
>>>>>>
>>>>>> You can check if the watermark has changed since the arrival of the
>>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>>> Otherwise the watermark will never increase until an element arrive and 
>>>>>> if
>>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>>
>>>>>> Sameer
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>>> till.rohrm...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> the problem is still that there is no corresponding watermark saying
>>>>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>>> element arrives which will reset the watermark. Thus, the system can 
>>>>>>> never
>>>>>>> know until this watermark is seen whether there will be an earlier 
>>>>>>> event or
>>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>>
>>>>>>> You're right that the negation operator won't solve the problem. It
>>>>>>> will indeed suffer from the same problem.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>>
>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>
>>>>>>>> Good question.
>>>>>>>>
>>>>>>>> I'm not sure whether the following will work:
>>>>>>>>
>>>>>>>> This could be done by creating a CEP matching pattern that uses
>>>>>>>> both of "notNext" (or "notFollowedBy") and "within" constructs. 
>>>>>>>> Something
>>>>>>>> like this:
>>>>>>>>
>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>>     .notNext("second")
>>>>>>>>     .within(Time.seconds(3));
>>>>>>>>
>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>>
>>>>>>>> Note: I have requested these negation patterns to be implemented in
>>>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>>
>>>>>>>>
>>>>>>>> - LF
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* David Koch <ogd...@googlemail.com>
>>>>>>>> *To:* user@flink.apache.org; lg...@yahoo.com
>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>>
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> Thank you for the explanation as well as the link to the other
>>>>>>>> post. Interesting to learn about some of the open JIRAs.
>>>>>>>>
>>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>>> even when using event time I only get notified of timeouts upon 
>>>>>>>> subsequent
>>>>>>>> events.
>>>>>>>>
>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where
>>>>>>>> I read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>>>> timestamp, key the resultant stream by <key> and attempt to detect 
>>>>>>>> <key>
>>>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>>>
>>>>>>>> Apart from the fact that results are only printed when I close the
>>>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>>>
>>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>>> timeout to be triggered.
>>>>>>>>
>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>
>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>>
>>>>>>>> The following is a better link:
>>>>>>>>
>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>>> 40mail.gmail.com%3E
>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>>
>>>>>>>>
>>>>>>>> - LF
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com>
>>>>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>>
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature
>>>>>>>> solve this issue?
>>>>>>>>
>>>>>>>> See this discussion thread:
>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> //  Atul
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* Till Rohrmann <trohrm...@apache.org>
>>>>>>>> *To:* user@flink.apache.org
>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> in case of event time, the timeout will be detected when the first
>>>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark 
>>>>>>>> per
>>>>>>>> event).
>>>>>>>>
>>>>>>>> In case of processing time, the time is only updated whenever a new
>>>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 
>>>>>>>> seconds
>>>>>>>> later, than you won't see the timeout until then.
>>>>>>>>
>>>>>>>> In the case of processing time, we could think about registering
>>>>>>>> timeout timers for processing time. However, I would highly recommend 
>>>>>>>> you
>>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> With Flink CEP, is there a way to actively listen to pattern
>>>>>>>> matches that time out? I am under the impression that this is not 
>>>>>>>> possible.
>>>>>>>>
>>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 
>>>>>>>> seconds
>>>>>>>> for each user.
>>>>>>>>
>>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>>> only fires upon the first event after the specified timeout. For 
>>>>>>>> example,
>>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>>> event).
>>>>>>>>
>>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>>> interval expiring since passive invalidation is not really applicable 
>>>>>>>> in my
>>>>>>>> case.
>>>>>>>>
>>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> David
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to