Thank you, Kostas, for reviewing this.

Although points 1 and 3 are something which I was planning to address in
the actual implementation, #2 would still be a show stopper.

I'll spend some more time on this and maybe come up with a better way to
achieve the same use case without mixing the two notions of time.

Until then I hope it is OK if we use the modified library to unblock
ourselves.

Thanks,
Shailesh

On Tue, Apr 3, 2018 at 3:05 PM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Shailesh,
>
> Your solution may fit your use case, but as Dawid mentioned earlier, it
> makes a lot of
> assumptions about the input.
>
> From a look at your PoC:
> 1) You assume no late data (you do not drop anything) and no
> out-of-orderness.
> 2) You mix the two notions of time (event and processing).
> 3) You eagerly process each element which can have performance
> implications especially if
>     you go for RocksDb backend.
>
> Given the above, I do not think that this can go in Flink.
>
> Something that goes in Flink will have to be maintained by the community.
> So, although some use cases may have particular needs, we refrain from
> adding
> to the master, code that makes assumptions specifically tailored for
> specific use cases.
>
> I understand that the one watermark per key could conceptually fit better
> in your use case,
> but there may be a better way to achieve your goal, one that aligns with
> Flink’s offered
> semantics.
>
> Thanks,
> Kostas
>
> > On Apr 3, 2018, at 11:01 AM, Shailesh Jain <shailesh.j...@stellapps.com>
> wrote:
> >
> > Bump.
> >
> > On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <
> shailesh.j...@stellapps.com>
> > wrote:
> >
> >> To trigger the computations for each batch, I'll have to use the
> >> processing time timer in the abstract keyed cep operator, right?
> >>
> >> The reason why I'm avoiding the watermarks is that it is not possible to
> >> generate watermarks per key.
> >>
> >> Thanks for the 'within' remark.
> >>
> >> A couple of questions:
> >>
> >> 1. Given our use case and the limitations of per key watermark, do you
> >> think that this approach is worth adding to the library?
> >>
> >> 2. What other aspects of the framework do I need to consider/test before
> >> we go about implementing this approach formally?
> >>
> >> Thanks,
> >> Shailesh
> >>
> >>
> >> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <
> wysakowicz.da...@gmail.com>
> >> wrote:
> >>
> >>> If you do the buffering you can emit watermark for each such batch
> (equal
> >>> to highest timestamp in such batch). This way you won’t need to sort.
> CEP
> >>> library will do it for you.
> >>> The within clause will work in EventTime then.
> >>>
> >>> One more remark also the within clause always work for whole pattern
> not
> >>> just to a part of it, it does not matter if you apply it in the middle
> (as
> >>> you did) or at the very end.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>>> On 19 Mar 2018, at 11:31, Shailesh Jain <shailesh.j...@stellapps.com>
> >>> wrote:
> >>>>
> >>>> Thanks for your reply, Dawid.
> >>>>
> >>>> I understand that the approach I've tried out is not generic enough,
> and
> >>>> would need a lot more thought to be put into w.r.t parallelism
> >>>> considerations, out of order events, effects on downstream operators
> >>> etc.
> >>>> The intention was to do a quick implementation to check the
> feasibility
> >>> of
> >>>> the approach.
> >>>>
> >>>>>> It will also not sort the events etc.
> >>>>
> >>>> In the application code to test this approach, I had used a Global
> >>> window
> >>>> to sort events based on their timestamp (similar to how out of order
> >>> events
> >>>> are dropped based on a time-bound, I'm dropping them based on a count
> >>> based
> >>>> bound).
> >>>>
> >>>> allEvents = allEvents
> >>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >>>>       .window(GlobalWindows.create())
> >>>>       .trigger(new GlobalWindowCountTrigger(
> >>> propLoader.getSortWindowSize()))
> >>>>       .process(new SortWindowProcessFunction())
> >>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >>>>       .assignTimestampsAndWatermarks(new TimestampsExtractor())
> >>>>       .uid(Constants.TS_EX_UID);
> >>>> PatternLoader
> >>>>       .applyPatterns(allEvents, propLoader.getPatternClassNames())
> >>>>       .addSink(createKafkaSink(kafkaProps))
> >>>>       .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> >>>>
> >>>>
> >>>>>> If in the getCurrentWatermark method of your
> >>>> AssignerWithPeriodicWatermarks you will just return
> >>>>>> new Watermark(System.currentTimeMillis()), you will get the same
> >>>> behaviour as with that change,
> >>>>>> am I right?
> >>>>
> >>>> If watermarks are generated based on the machine time, the major
> issue I
> >>>> see is that we will not be able to leverage Event Time functionality.
> >>>> Specifically, if I have patterns which look for the absence of an
> Event
> >>> for
> >>>> a fixed period of time.
> >>>>
> >>>> For eg. We have many such patterns:
> >>>>
> >>>> Pattern<Event, Event> pattern = Pattern.<Event>begin
> >>>>       (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
> >>>>               .skipPastLastEvent())
> >>>>       .where(Conditions.getUnderchilledCondition())
> >>>>       .notFollowedBy(COMPRESSOR_ON)
> >>>>       .where(Conditions.getCompressorOnCondition())
> >>>>       .within(Time.minutes(30))
> >>>>       .followedBy(HIGH_TEMP)
> >>>>       .where(Conditions.getHighTemperatureCondition());
> >>>>
> >>>> Now when there are network issues (which are very frequent), queued
> >>> events
> >>>> are delivered together, and such patterns will not be matched
> correctly
> >>> as
> >>>> pruning of events from NFA's buffer will not be done based on the
> >>> timestamp
> >>>> within the event, but on the watermark received by the operator.
> >>>>
> >>>> Is my understanding here correct?
> >>>>
> >>>> Thanks,
> >>>> Shailesh
> >>>>
> >>>> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> >>>> wysakowicz.da...@gmail.com> wrote:
> >>>>
> >>>>> Hi Shailesh,
> >>>>>
> >>>>> Thanks for your interest in the CEP library and sorry for late
> >>> response. I
> >>>>> must say I am not fun of this approach.
> >>>>> After this change, the Processing time is no longer a processing
> time,
> >>>>> plus it will work differently in any other place of Flink. It will
> >>> also not
> >>>>> sort the events etc.
> >>>>> Moreover I think you could achieve pretty similar solution if you
> >>> generate
> >>>>> your watermark based on the machine time. If in the
> getCurrentWatermark
> >>>>> method
> >>>>> of your AssignerWithPeriodicWatermarks you will just return new
> >>>>> Watermark(System.currentTimeMillis()), you will get the same
> >>> behaviour as
> >>>>> with that change, am I right?
> >>>>>
> >>>>> Best,
> >>>>> Dawid
> >>>>>
> >>>>>> On 18 Mar 2018, at 09:00, Shailesh Jain <
> shailesh.j...@stellapps.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Thanks Aljoscha.
> >>>>>>
> >>>>>> Bump.
> >>>>>>
> >>>>>> I understand everyone would be busy with 1.5.0, but would really
> >>>>> appreciate
> >>>>>> slight help in unblocking us here.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Shailesh
> >>>>>>
> >>>>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <
> >>> aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I think this should have been sent to the dev mailing list because
> in
> >>>>> the
> >>>>>>> user mailing list it might disappear among a lot of other mail.
> >>>>>>>
> >>>>>>> Forwarding...
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
> >>> shailesh.j...@stellapps.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> We've been facing issues* w.r.t watermarks not supported per key,
> >>> which
> >>>>>>> led us to:
> >>>>>>>>
> >>>>>>>> Either (a) run the job in Processing time for a KeyedStream ->
> >>>>>>> compromising on use cases which revolve around catching time-based
> >>>>> patterns
> >>>>>>>> or (b) run the job in Event time for multiple data streams (one
> data
> >>>>>>> stream per key) -> this is not scalable as the number of operators
> >>> grow
> >>>>>>> linearly with the number of keys
> >>>>>>>>
> >>>>>>>> To address this, we've done a quick (poc) change in the
> >>>>>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
> >>> based
> >>>>>>> on timestamps extracted from the events arriving into the operator
> >>> (and
> >>>>> not
> >>>>>>> from the watermarks). We've tested it against our usecase and are
> >>>>> seeing a
> >>>>>>> significant improvement in memory usage without compromising on the
> >>>>>>> watermark functionality.
> >>>>>>>>
> >>>>>>>> It'll be really helpful if someone from the cep dev group can
> take a
> >>>>>>> look at this branch - https://github.com/
> jainshailesh/flink/commits/
> >>>>>>> cep_changes <https://github.com/jainshailesh/flink/commits/
> >>> cep_changes>
> >>>>>>> and provide comments on the approach taken, and maybe guide us on
> the
> >>>>> next
> >>>>>>> steps for taking it forward.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Shailesh
> >>>>>>>>
> >>>>>>>> * Links to previous email threads related to the same issue:
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Question-on-event-time-functionality-
> >>>>>>> using-Flink-in-a-IoT-usecase-td18653.html <
> http://apache-flink-user-
> >>>>>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
> >>>>>>> event-time-functionality-using-Flink-in-a-IoT-usecase-
> td18653.html>
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
> >>> KeyedStream-td16629.html
> >>>>> <
> >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
> >>> KeyedStream-td16629.html
> >>>>>>
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>>>>> and-Job-manager-memory-requirements-td18384.html <
> >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>>>>> and-Job-manager-memory-requirements-td18384.html>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Reply via email to