Bump.

On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <shailesh.j...@stellapps.com>
wrote:

> To trigger the computations for each batch, I'll have to use the
> processing time timer in the abstract keyed cep operator, right?
>
> The reason why I'm avoiding the watermarks is that it is not possible to
> generate watermarks per key.
>
> Thanks for the 'within' remark.
>
> A couple of questions:
>
> 1. Given our use case and the limitations of per key watermark, do you
> think that this approach is worth adding to the library?
>
> 2. What other aspects of the framework do I need to consider/test before
> we go about implementing this approach formally?
>
> Thanks,
> Shailesh
>
>
> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <wysakowicz.da...@gmail.com>
> wrote:
>
>> If you do the buffering you can emit watermark for each such batch (equal
>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>> library will do it for you.
>> The within clause will work in EventTime then.
>>
>> One more remark also the within clause always work for whole pattern not
>> just to a part of it, it does not matter if you apply it in the middle (as
>> you did) or at the very end.
>>
>> Best,
>> Dawid
>>
>> > On 19 Mar 2018, at 11:31, Shailesh Jain <shailesh.j...@stellapps.com>
>> wrote:
>> >
>> > Thanks for your reply, Dawid.
>> >
>> > I understand that the approach I've tried out is not generic enough, and
>> > would need a lot more thought to be put into w.r.t parallelism
>> > considerations, out of order events, effects on downstream operators
>> etc.
>> > The intention was to do a quick implementation to check the feasibility
>> of
>> > the approach.
>> >
>> >>> It will also not sort the events etc.
>> >
>> > In the application code to test this approach, I had used a Global
>> window
>> > to sort events based on their timestamp (similar to how out of order
>> events
>> > are dropped based on a time-bound, I'm dropping them based on a count
>> based
>> > bound).
>> >
>> > allEvents = allEvents
>> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >        .window(GlobalWindows.create())
>> >        .trigger(new GlobalWindowCountTrigger(
>> propLoader.getSortWindowSize()))
>> >        .process(new SortWindowProcessFunction())
>> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >        .assignTimestampsAndWatermarks(new TimestampsExtractor())
>> >        .uid(Constants.TS_EX_UID);
>> > PatternLoader
>> >        .applyPatterns(allEvents, propLoader.getPatternClassNames())
>> >        .addSink(createKafkaSink(kafkaProps))
>> >        .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
>> >
>> >
>> >>> If in the getCurrentWatermark method of your
>> > AssignerWithPeriodicWatermarks you will just return
>> >>> new Watermark(System.currentTimeMillis()), you will get the same
>> > behaviour as with that change,
>> >>> am I right?
>> >
>> > If watermarks are generated based on the machine time, the major issue I
>> > see is that we will not be able to leverage Event Time functionality.
>> > Specifically, if I have patterns which look for the absence of an Event
>> for
>> > a fixed period of time.
>> >
>> > For eg. We have many such patterns:
>> >
>> > Pattern<Event, Event> pattern = Pattern.<Event>begin
>> >        (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>> >                .skipPastLastEvent())
>> >        .where(Conditions.getUnderchilledCondition())
>> >        .notFollowedBy(COMPRESSOR_ON)
>> >        .where(Conditions.getCompressorOnCondition())
>> >        .within(Time.minutes(30))
>> >        .followedBy(HIGH_TEMP)
>> >        .where(Conditions.getHighTemperatureCondition());
>> >
>> > Now when there are network issues (which are very frequent), queued
>> events
>> > are delivered together, and such patterns will not be matched correctly
>> as
>> > pruning of events from NFA's buffer will not be done based on the
>> timestamp
>> > within the event, but on the watermark received by the operator.
>> >
>> > Is my understanding here correct?
>> >
>> > Thanks,
>> > Shailesh
>> >
>> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
>> > wysakowicz.da...@gmail.com> wrote:
>> >
>> >> Hi Shailesh,
>> >>
>> >> Thanks for your interest in the CEP library and sorry for late
>> response. I
>> >> must say I am not fun of this approach.
>> >> After this change, the Processing time is no longer a processing time,
>> >> plus it will work differently in any other place of Flink. It will
>> also not
>> >> sort the events etc.
>> >> Moreover I think you could achieve pretty similar solution if you
>> generate
>> >> your watermark based on the machine time. If in the getCurrentWatermark
>> >> method
>> >> of your AssignerWithPeriodicWatermarks you will just return new
>> >> Watermark(System.currentTimeMillis()), you will get the same
>> behaviour as
>> >> with that change, am I right?
>> >>
>> >> Best,
>> >> Dawid
>> >>
>> >>> On 18 Mar 2018, at 09:00, Shailesh Jain <shailesh.j...@stellapps.com>
>> >> wrote:
>> >>>
>> >>> Thanks Aljoscha.
>> >>>
>> >>> Bump.
>> >>>
>> >>> I understand everyone would be busy with 1.5.0, but would really
>> >> appreciate
>> >>> slight help in unblocking us here.
>> >>>
>> >>> Thanks,
>> >>> Shailesh
>> >>>
>> >>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <
>> aljos...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> Hi,
>> >>>>
>> >>>> I think this should have been sent to the dev mailing list because in
>> >> the
>> >>>> user mailing list it might disappear among a lot of other mail.
>> >>>>
>> >>>> Forwarding...
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
>> shailesh.j...@stellapps.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> We've been facing issues* w.r.t watermarks not supported per key,
>> which
>> >>>> led us to:
>> >>>>>
>> >>>>> Either (a) run the job in Processing time for a KeyedStream ->
>> >>>> compromising on use cases which revolve around catching time-based
>> >> patterns
>> >>>>> or (b) run the job in Event time for multiple data streams (one data
>> >>>> stream per key) -> this is not scalable as the number of operators
>> grow
>> >>>> linearly with the number of keys
>> >>>>>
>> >>>>> To address this, we've done a quick (poc) change in the
>> >>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
>> based
>> >>>> on timestamps extracted from the events arriving into the operator
>> (and
>> >> not
>> >>>> from the watermarks). We've tested it against our usecase and are
>> >> seeing a
>> >>>> significant improvement in memory usage without compromising on the
>> >>>> watermark functionality.
>> >>>>>
>> >>>>> It'll be really helpful if someone from the cep dev group can take a
>> >>>> look at this branch - https://github.com/jainshailesh/flink/commits/
>> >>>> cep_changes <https://github.com/jainshailesh/flink/commits/
>> cep_changes>
>> >>>> and provide comments on the approach taken, and maybe guide us on the
>> >> next
>> >>>> steps for taking it forward.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Shailesh
>> >>>>>
>> >>>>> * Links to previous email threads related to the same issue:
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Question-on-event-time-functionality-
>> >>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
>> >>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
>> >>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>> KeyedStream-td16629.html
>> >> <
>> >>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>> KeyedStream-td16629.html
>> >>>
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Correlation-between-number-of-operators-
>> >>>> and-Job-manager-memory-requirements-td18384.html <
>> >>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Correlation-between-number-of-operators-
>> >>>> and-Job-manager-memory-requirements-td18384.html>
>> >>>>>
>> >>>>
>> >>>>
>> >>
>> >>
>>
>>

Reply via email to