Thank you, Kostas, for reviewing this. Although points 1 and 3 are something which I was planning to address in the actual implementation, #2 would still be a show stopper.
I'll spend some more time on this and maybe come up with a better way to achieve the same use case without mixing the two notions of time. Until then I hope it is OK if we use the modified library to unblock ourselves. Thanks, Shailesh On Tue, Apr 3, 2018 at 3:05 PM, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Shailesh, > > Your solution may fit your use case, but as Dawid mentioned earlier, it > makes a lot of > assumptions about the input. > > From a look at your PoC: > 1) You assume no late data (you do not drop anything) and no > out-of-orderness. > 2) You mix the two notions of time (event and processing). > 3) You eagerly process each element which can have performance > implications especially if > you go for RocksDb backend. > > Given the above, I do not think that this can go in Flink. > > Something that goes in Flink will have to be maintained by the community. > So, although some use cases may have particular needs, we refrain from > adding > to the master, code that makes assumptions specifically tailored for > specific use cases. > > I understand that the one watermark per key could conceptually fit better > in your use case, > but there may be a better way to achieve your goal, one that aligns with > Flink’s offered > semantics. > > Thanks, > Kostas > > > On Apr 3, 2018, at 11:01 AM, Shailesh Jain <shailesh.j...@stellapps.com> > wrote: > > > > Bump. > > > > On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain < > shailesh.j...@stellapps.com> > > wrote: > > > >> To trigger the computations for each batch, I'll have to use the > >> processing time timer in the abstract keyed cep operator, right? > >> > >> The reason why I'm avoiding the watermarks is that it is not possible to > >> generate watermarks per key. > >> > >> Thanks for the 'within' remark. > >> > >> A couple of questions: > >> > >> 1. Given our use case and the limitations of per key watermark, do you > >> think that this approach is worth adding to the library? > >> > >> 2. What other aspects of the framework do I need to consider/test before > >> we go about implementing this approach formally? > >> > >> Thanks, > >> Shailesh > >> > >> > >> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, < > wysakowicz.da...@gmail.com> > >> wrote: > >> > >>> If you do the buffering you can emit watermark for each such batch > (equal > >>> to highest timestamp in such batch). This way you won’t need to sort. > CEP > >>> library will do it for you. > >>> The within clause will work in EventTime then. > >>> > >>> One more remark also the within clause always work for whole pattern > not > >>> just to a part of it, it does not matter if you apply it in the middle > (as > >>> you did) or at the very end. > >>> > >>> Best, > >>> Dawid > >>> > >>>> On 19 Mar 2018, at 11:31, Shailesh Jain <shailesh.j...@stellapps.com> > >>> wrote: > >>>> > >>>> Thanks for your reply, Dawid. > >>>> > >>>> I understand that the approach I've tried out is not generic enough, > and > >>>> would need a lot more thought to be put into w.r.t parallelism > >>>> considerations, out of order events, effects on downstream operators > >>> etc. > >>>> The intention was to do a quick implementation to check the > feasibility > >>> of > >>>> the approach. > >>>> > >>>>>> It will also not sort the events etc. > >>>> > >>>> In the application code to test this approach, I had used a Global > >>> window > >>>> to sort events based on their timestamp (similar to how out of order > >>> events > >>>> are dropped based on a time-bound, I'm dropping them based on a count > >>> based > >>>> bound). > >>>> > >>>> allEvents = allEvents > >>>> .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) > >>>> .window(GlobalWindows.create()) > >>>> .trigger(new GlobalWindowCountTrigger( > >>> propLoader.getSortWindowSize())) > >>>> .process(new SortWindowProcessFunction()) > >>>> .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) > >>>> .assignTimestampsAndWatermarks(new TimestampsExtractor()) > >>>> .uid(Constants.TS_EX_UID); > >>>> PatternLoader > >>>> .applyPatterns(allEvents, propLoader.getPatternClassNames()) > >>>> .addSink(createKafkaSink(kafkaProps)) > >>>> .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID); > >>>> > >>>> > >>>>>> If in the getCurrentWatermark method of your > >>>> AssignerWithPeriodicWatermarks you will just return > >>>>>> new Watermark(System.currentTimeMillis()), you will get the same > >>>> behaviour as with that change, > >>>>>> am I right? > >>>> > >>>> If watermarks are generated based on the machine time, the major > issue I > >>>> see is that we will not be able to leverage Event Time functionality. > >>>> Specifically, if I have patterns which look for the absence of an > Event > >>> for > >>>> a fixed period of time. > >>>> > >>>> For eg. We have many such patterns: > >>>> > >>>> Pattern<Event, Event> pattern = Pattern.<Event>begin > >>>> (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy > >>>> .skipPastLastEvent()) > >>>> .where(Conditions.getUnderchilledCondition()) > >>>> .notFollowedBy(COMPRESSOR_ON) > >>>> .where(Conditions.getCompressorOnCondition()) > >>>> .within(Time.minutes(30)) > >>>> .followedBy(HIGH_TEMP) > >>>> .where(Conditions.getHighTemperatureCondition()); > >>>> > >>>> Now when there are network issues (which are very frequent), queued > >>> events > >>>> are delivered together, and such patterns will not be matched > correctly > >>> as > >>>> pruning of events from NFA's buffer will not be done based on the > >>> timestamp > >>>> within the event, but on the watermark received by the operator. > >>>> > >>>> Is my understanding here correct? > >>>> > >>>> Thanks, > >>>> Shailesh > >>>> > >>>> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz < > >>>> wysakowicz.da...@gmail.com> wrote: > >>>> > >>>>> Hi Shailesh, > >>>>> > >>>>> Thanks for your interest in the CEP library and sorry for late > >>> response. I > >>>>> must say I am not fun of this approach. > >>>>> After this change, the Processing time is no longer a processing > time, > >>>>> plus it will work differently in any other place of Flink. It will > >>> also not > >>>>> sort the events etc. > >>>>> Moreover I think you could achieve pretty similar solution if you > >>> generate > >>>>> your watermark based on the machine time. If in the > getCurrentWatermark > >>>>> method > >>>>> of your AssignerWithPeriodicWatermarks you will just return new > >>>>> Watermark(System.currentTimeMillis()), you will get the same > >>> behaviour as > >>>>> with that change, am I right? > >>>>> > >>>>> Best, > >>>>> Dawid > >>>>> > >>>>>> On 18 Mar 2018, at 09:00, Shailesh Jain < > shailesh.j...@stellapps.com> > >>>>> wrote: > >>>>>> > >>>>>> Thanks Aljoscha. > >>>>>> > >>>>>> Bump. > >>>>>> > >>>>>> I understand everyone would be busy with 1.5.0, but would really > >>>>> appreciate > >>>>>> slight help in unblocking us here. > >>>>>> > >>>>>> Thanks, > >>>>>> Shailesh > >>>>>> > >>>>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek < > >>> aljos...@apache.org> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> I think this should have been sent to the dev mailing list because > in > >>>>> the > >>>>>>> user mailing list it might disappear among a lot of other mail. > >>>>>>> > >>>>>>> Forwarding... > >>>>>>> > >>>>>>> Best, > >>>>>>> Aljoscha > >>>>>>> > >>>>>>>> On 14. Mar 2018, at 06:20, Shailesh Jain < > >>> shailesh.j...@stellapps.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> We've been facing issues* w.r.t watermarks not supported per key, > >>> which > >>>>>>> led us to: > >>>>>>>> > >>>>>>>> Either (a) run the job in Processing time for a KeyedStream -> > >>>>>>> compromising on use cases which revolve around catching time-based > >>>>> patterns > >>>>>>>> or (b) run the job in Event time for multiple data streams (one > data > >>>>>>> stream per key) -> this is not scalable as the number of operators > >>> grow > >>>>>>> linearly with the number of keys > >>>>>>>> > >>>>>>>> To address this, we've done a quick (poc) change in the > >>>>>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress > >>> based > >>>>>>> on timestamps extracted from the events arriving into the operator > >>> (and > >>>>> not > >>>>>>> from the watermarks). We've tested it against our usecase and are > >>>>> seeing a > >>>>>>> significant improvement in memory usage without compromising on the > >>>>>>> watermark functionality. > >>>>>>>> > >>>>>>>> It'll be really helpful if someone from the cep dev group can > take a > >>>>>>> look at this branch - https://github.com/ > jainshailesh/flink/commits/ > >>>>>>> cep_changes <https://github.com/jainshailesh/flink/commits/ > >>> cep_changes> > >>>>>>> and provide comments on the approach taken, and maybe guide us on > the > >>>>> next > >>>>>>> steps for taking it forward. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Shailesh > >>>>>>>> > >>>>>>>> * Links to previous email threads related to the same issue: > >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050. > >>>>>>> n4.nabble.com/Question-on-event-time-functionality- > >>>>>>> using-Flink-in-a-IoT-usecase-td18653.html < > http://apache-flink-user- > >>>>>>> mailing-list-archive.2336050.n4.nabble.com/Question-on- > >>>>>>> event-time-functionality-using-Flink-in-a-IoT-usecase- > td18653.html> > >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050. > >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a- > >>> KeyedStream-td16629.html > >>>>> < > >>>>>>> http://apache-flink-user-mailing-list-archive.2336050. > >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a- > >>> KeyedStream-td16629.html > >>>>>> > >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050. > >>>>>>> n4.nabble.com/Correlation-between-number-of-operators- > >>>>>>> and-Job-manager-memory-requirements-td18384.html < > >>>>>>> http://apache-flink-user-mailing-list-archive.2336050. > >>>>>>> n4.nabble.com/Correlation-between-number-of-operators- > >>>>>>> and-Job-manager-memory-requirements-td18384.html> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >>> > >