Bump. On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <[email protected]> wrote:
> To trigger the computations for each batch, I'll have to use the > processing time timer in the abstract keyed cep operator, right? > > The reason why I'm avoiding the watermarks is that it is not possible to > generate watermarks per key. > > Thanks for the 'within' remark. > > A couple of questions: > > 1. Given our use case and the limitations of per key watermark, do you > think that this approach is worth adding to the library? > > 2. What other aspects of the framework do I need to consider/test before > we go about implementing this approach formally? > > Thanks, > Shailesh > > > On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <[email protected]> > wrote: > >> If you do the buffering you can emit watermark for each such batch (equal >> to highest timestamp in such batch). This way you won’t need to sort. CEP >> library will do it for you. >> The within clause will work in EventTime then. >> >> One more remark also the within clause always work for whole pattern not >> just to a part of it, it does not matter if you apply it in the middle (as >> you did) or at the very end. >> >> Best, >> Dawid >> >> > On 19 Mar 2018, at 11:31, Shailesh Jain <[email protected]> >> wrote: >> > >> > Thanks for your reply, Dawid. >> > >> > I understand that the approach I've tried out is not generic enough, and >> > would need a lot more thought to be put into w.r.t parallelism >> > considerations, out of order events, effects on downstream operators >> etc. >> > The intention was to do a quick implementation to check the feasibility >> of >> > the approach. >> > >> >>> It will also not sort the events etc. >> > >> > In the application code to test this approach, I had used a Global >> window >> > to sort events based on their timestamp (similar to how out of order >> events >> > are dropped based on a time-bound, I'm dropping them based on a count >> based >> > bound). >> > >> > allEvents = allEvents >> > .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) >> > .window(GlobalWindows.create()) >> > .trigger(new GlobalWindowCountTrigger( >> propLoader.getSortWindowSize())) >> > .process(new SortWindowProcessFunction()) >> > .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) >> > .assignTimestampsAndWatermarks(new TimestampsExtractor()) >> > .uid(Constants.TS_EX_UID); >> > PatternLoader >> > .applyPatterns(allEvents, propLoader.getPatternClassNames()) >> > .addSink(createKafkaSink(kafkaProps)) >> > .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID); >> > >> > >> >>> If in the getCurrentWatermark method of your >> > AssignerWithPeriodicWatermarks you will just return >> >>> new Watermark(System.currentTimeMillis()), you will get the same >> > behaviour as with that change, >> >>> am I right? >> > >> > If watermarks are generated based on the machine time, the major issue I >> > see is that we will not be able to leverage Event Time functionality. >> > Specifically, if I have patterns which look for the absence of an Event >> for >> > a fixed period of time. >> > >> > For eg. We have many such patterns: >> > >> > Pattern<Event, Event> pattern = Pattern.<Event>begin >> > (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy >> > .skipPastLastEvent()) >> > .where(Conditions.getUnderchilledCondition()) >> > .notFollowedBy(COMPRESSOR_ON) >> > .where(Conditions.getCompressorOnCondition()) >> > .within(Time.minutes(30)) >> > .followedBy(HIGH_TEMP) >> > .where(Conditions.getHighTemperatureCondition()); >> > >> > Now when there are network issues (which are very frequent), queued >> events >> > are delivered together, and such patterns will not be matched correctly >> as >> > pruning of events from NFA's buffer will not be done based on the >> timestamp >> > within the event, but on the watermark received by the operator. >> > >> > Is my understanding here correct? >> > >> > Thanks, >> > Shailesh >> > >> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz < >> > [email protected]> wrote: >> > >> >> Hi Shailesh, >> >> >> >> Thanks for your interest in the CEP library and sorry for late >> response. I >> >> must say I am not fun of this approach. >> >> After this change, the Processing time is no longer a processing time, >> >> plus it will work differently in any other place of Flink. It will >> also not >> >> sort the events etc. >> >> Moreover I think you could achieve pretty similar solution if you >> generate >> >> your watermark based on the machine time. If in the getCurrentWatermark >> >> method >> >> of your AssignerWithPeriodicWatermarks you will just return new >> >> Watermark(System.currentTimeMillis()), you will get the same >> behaviour as >> >> with that change, am I right? >> >> >> >> Best, >> >> Dawid >> >> >> >>> On 18 Mar 2018, at 09:00, Shailesh Jain <[email protected]> >> >> wrote: >> >>> >> >>> Thanks Aljoscha. >> >>> >> >>> Bump. >> >>> >> >>> I understand everyone would be busy with 1.5.0, but would really >> >> appreciate >> >>> slight help in unblocking us here. >> >>> >> >>> Thanks, >> >>> Shailesh >> >>> >> >>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek < >> [email protected]> >> >>> wrote: >> >>> >> >>>> Hi, >> >>>> >> >>>> I think this should have been sent to the dev mailing list because in >> >> the >> >>>> user mailing list it might disappear among a lot of other mail. >> >>>> >> >>>> Forwarding... >> >>>> >> >>>> Best, >> >>>> Aljoscha >> >>>> >> >>>>> On 14. Mar 2018, at 06:20, Shailesh Jain < >> [email protected]> >> >>>> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> We've been facing issues* w.r.t watermarks not supported per key, >> which >> >>>> led us to: >> >>>>> >> >>>>> Either (a) run the job in Processing time for a KeyedStream -> >> >>>> compromising on use cases which revolve around catching time-based >> >> patterns >> >>>>> or (b) run the job in Event time for multiple data streams (one data >> >>>> stream per key) -> this is not scalable as the number of operators >> grow >> >>>> linearly with the number of keys >> >>>>> >> >>>>> To address this, we've done a quick (poc) change in the >> >>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress >> based >> >>>> on timestamps extracted from the events arriving into the operator >> (and >> >> not >> >>>> from the watermarks). We've tested it against our usecase and are >> >> seeing a >> >>>> significant improvement in memory usage without compromising on the >> >>>> watermark functionality. >> >>>>> >> >>>>> It'll be really helpful if someone from the cep dev group can take a >> >>>> look at this branch - https://github.com/jainshailesh/flink/commits/ >> >>>> cep_changes <https://github.com/jainshailesh/flink/commits/ >> cep_changes> >> >>>> and provide comments on the approach taken, and maybe guide us on the >> >> next >> >>>> steps for taking it forward. >> >>>>> >> >>>>> Thanks, >> >>>>> Shailesh >> >>>>> >> >>>>> * Links to previous email threads related to the same issue: >> >>>>> http://apache-flink-user-mailing-list-archive.2336050. >> >>>> n4.nabble.com/Question-on-event-time-functionality- >> >>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user- >> >>>> mailing-list-archive.2336050.n4.nabble.com/Question-on- >> >>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html> >> >>>>> http://apache-flink-user-mailing-list-archive.2336050. >> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a- >> KeyedStream-td16629.html >> >> < >> >>>> http://apache-flink-user-mailing-list-archive.2336050. >> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a- >> KeyedStream-td16629.html >> >>> >> >>>>> http://apache-flink-user-mailing-list-archive.2336050. >> >>>> n4.nabble.com/Correlation-between-number-of-operators- >> >>>> and-Job-manager-memory-requirements-td18384.html < >> >>>> http://apache-flink-user-mailing-list-archive.2336050. >> >>>> n4.nabble.com/Correlation-between-number-of-operators- >> >>>> and-Job-manager-memory-requirements-td18384.html> >> >>>>> >> >>>> >> >>>> >> >> >> >> >> >>
