Review request

2023-11-06 Thread Etienne Chauchot

Hi everyone,

Is there anyone available to review this PR (1) that I opened 1,5 month 
ago ? People I've pinged seem to be unavailable at the moment.


Thanks

[1] https://github.com/apache/flink/pull/23443

Best

Etienne



Code Review Request

2022-09-07 Thread ganlute
Could anyone help me review the changes?Thank you~



Here is the JIRA: https://issues.apache.org/jira/browse/FLINK-28910
Here is the PR: https://github.com/apache/flink/pull/20542

Re: PR Review Request

2022-01-25 Thread Jing Zhang
Please ignore me.
I originally wanted to send it to calcite's dev mail list, but I sent it to
the wrong mail list.
I'm terribly sorry.

Jing Zhang  于2022年1月26日周三 14:55写道:

> Hi community,
> My apologies for interrupting.
> Anyone could help to review the pr
> https://github.com/apache/calcite/pull/2606?
> Thanks a lot.
>
> CALCITE-4865 is the first sub-task of CALCITE-4864. This Jira aims to
> extend existing Table function in order to support Polymorphic Table
> Function which is introduced as the part of ANSI SQL 2016.
>
> The brief change logs of the PR are:
>   - Update `Parser.jj` to support partition by clause and order by clause
> for input table with set semantics of PTF
>   - Introduce `TableCharacteristics` which contains three characteristics
> of input table of table function
>   - Update `SqlTableFunction` to add a method `tableCharacteristics`,  the
> method returns the table characteristics for the ordinal-th argument to
> this table function. Default return value is Optional.empty which means the
> ordinal-th argument is not table.
>   - Introduce `SqlSetSemanticsTable` which represents input table with set
> semantics of Table Function, its `SqlKind` is `SET_SEMANTICS_TABLE`
>   - Updates `SqlValidatorImpl` to validate only set semantic table of
> Table Function could have partition by and order by clause
>   - Update `SqlToRelConverter#substituteSubQuery` to parse subQuery which
> represents set semantics table.
>
> PR: https://github.com/apache/calcite/pull/2606
> JIRA: https://issues.apache.org/jira/browse/CALCITE-4865
> Parent JARA: https://issues.apache.org/jira/browse/CALCITE-4864
>
> Best,
> Jing Zhang
>


PR Review Request

2022-01-25 Thread Jing Zhang
Hi community,
My apologies for interrupting.
Anyone could help to review the pr
https://github.com/apache/calcite/pull/2606?
Thanks a lot.

CALCITE-4865 is the first sub-task of CALCITE-4864. This Jira aims to
extend existing Table function in order to support Polymorphic Table
Function which is introduced as the part of ANSI SQL 2016.

The brief change logs of the PR are:
  - Update `Parser.jj` to support partition by clause and order by clause
for input table with set semantics of PTF
  - Introduce `TableCharacteristics` which contains three characteristics
of input table of table function
  - Update `SqlTableFunction` to add a method `tableCharacteristics`,  the
method returns the table characteristics for the ordinal-th argument to
this table function. Default return value is Optional.empty which means the
ordinal-th argument is not table.
  - Introduce `SqlSetSemanticsTable` which represents input table with set
semantics of Table Function, its `SqlKind` is `SET_SEMANTICS_TABLE`
  - Updates `SqlValidatorImpl` to validate only set semantic table of Table
Function could have partition by and order by clause
  - Update `SqlToRelConverter#substituteSubQuery` to parse subQuery which
represents set semantics table.

PR: https://github.com/apache/calcite/pull/2606
JIRA: https://issues.apache.org/jira/browse/CALCITE-4865
Parent JARA: https://issues.apache.org/jira/browse/CALCITE-4864

Best,
Jing Zhang


Re: [Proposal] CEP library changes - review request

2018-04-03 Thread Shailesh Jain
Thank you, Kostas, for reviewing this.

Although points 1 and 3 are something which I was planning to address in
the actual implementation, #2 would still be a show stopper.

I'll spend some more time on this and maybe come up with a better way to
achieve the same use case without mixing the two notions of time.

Until then I hope it is OK if we use the modified library to unblock
ourselves.

Thanks,
Shailesh

On Tue, Apr 3, 2018 at 3:05 PM, Kostas Kloudas 
wrote:

> Hi Shailesh,
>
> Your solution may fit your use case, but as Dawid mentioned earlier, it
> makes a lot of
> assumptions about the input.
>
> From a look at your PoC:
> 1) You assume no late data (you do not drop anything) and no
> out-of-orderness.
> 2) You mix the two notions of time (event and processing).
> 3) You eagerly process each element which can have performance
> implications especially if
> you go for RocksDb backend.
>
> Given the above, I do not think that this can go in Flink.
>
> Something that goes in Flink will have to be maintained by the community.
> So, although some use cases may have particular needs, we refrain from
> adding
> to the master, code that makes assumptions specifically tailored for
> specific use cases.
>
> I understand that the one watermark per key could conceptually fit better
> in your use case,
> but there may be a better way to achieve your goal, one that aligns with
> Flink’s offered
> semantics.
>
> Thanks,
> Kostas
>
> > On Apr 3, 2018, at 11:01 AM, Shailesh Jain 
> wrote:
> >
> > Bump.
> >
> > On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <
> shailesh.j...@stellapps.com>
> > wrote:
> >
> >> To trigger the computations for each batch, I'll have to use the
> >> processing time timer in the abstract keyed cep operator, right?
> >>
> >> The reason why I'm avoiding the watermarks is that it is not possible to
> >> generate watermarks per key.
> >>
> >> Thanks for the 'within' remark.
> >>
> >> A couple of questions:
> >>
> >> 1. Given our use case and the limitations of per key watermark, do you
> >> think that this approach is worth adding to the library?
> >>
> >> 2. What other aspects of the framework do I need to consider/test before
> >> we go about implementing this approach formally?
> >>
> >> Thanks,
> >> Shailesh
> >>
> >>
> >> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <
> wysakowicz.da...@gmail.com>
> >> wrote:
> >>
> >>> If you do the buffering you can emit watermark for each such batch
> (equal
> >>> to highest timestamp in such batch). This way you won’t need to sort.
> CEP
> >>> library will do it for you.
> >>> The within clause will work in EventTime then.
> >>>
> >>> One more remark also the within clause always work for whole pattern
> not
> >>> just to a part of it, it does not matter if you apply it in the middle
> (as
> >>> you did) or at the very end.
> >>>
> >>> Best,
> >>> Dawid
> >>>
>  On 19 Mar 2018, at 11:31, Shailesh Jain 
> >>> wrote:
> 
>  Thanks for your reply, Dawid.
> 
>  I understand that the approach I've tried out is not generic enough,
> and
>  would need a lot more thought to be put into w.r.t parallelism
>  considerations, out of order events, effects on downstream operators
> >>> etc.
>  The intention was to do a quick implementation to check the
> feasibility
> >>> of
>  the approach.
> 
> >> It will also not sort the events etc.
> 
>  In the application code to test this approach, I had used a Global
> >>> window
>  to sort events based on their timestamp (similar to how out of order
> >>> events
>  are dropped based on a time-bound, I'm dropping them based on a count
> >>> based
>  bound).
> 
>  allEvents = allEvents
>    .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>    .window(GlobalWindows.create())
>    .trigger(new GlobalWindowCountTrigger(
> >>> propLoader.getSortWindowSize()))
>    .process(new SortWindowProcessFunction())
>    .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>    .assignTimestampsAndWatermarks(new TimestampsExtractor())
>    .uid(Constants.TS_EX_UID);
>  PatternLoader
>    .applyPatterns(allEvents, propLoader.getPatternClassNames())
>    .addSink(createKafkaSink(kafkaProps))
>    .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> 
> 
> >> If in the getCurrentWatermark method of your
>  AssignerWithPeriodicWatermarks you will just return
> >> new Watermark(System.currentTimeMillis()), you will get the same
>  behaviour as with that change,
> >> am I right?
> 
>  If watermarks are generated based on the machine time, the major
> issue I
>  see is that we will not be able to leverage Event Time functionality.
>  Specifically, if I have patterns which look for the absence of an
> Event
> >>> for
>  a fixed period of time.

Re: [Proposal] CEP library changes - review request

2018-04-03 Thread Kostas Kloudas
Hi Shailesh,

Your solution may fit your use case, but as Dawid mentioned earlier, it makes a 
lot of 
assumptions about the input. 

From a look at your PoC:
1) You assume no late data (you do not drop anything) and no out-of-orderness.
2) You mix the two notions of time (event and processing).
3) You eagerly process each element which can have performance implications 
especially if 
you go for RocksDb backend.

Given the above, I do not think that this can go in Flink.
 
Something that goes in Flink will have to be maintained by the community. 
So, although some use cases may have particular needs, we refrain from adding 
to the master, code that makes assumptions specifically tailored for specific 
use cases.

I understand that the one watermark per key could conceptually fit better in 
your use case, 
but there may be a better way to achieve your goal, one that aligns with 
Flink’s offered 
semantics.

Thanks, 
Kostas

> On Apr 3, 2018, at 11:01 AM, Shailesh Jain  
> wrote:
> 
> Bump.
> 
> On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain 
> wrote:
> 
>> To trigger the computations for each batch, I'll have to use the
>> processing time timer in the abstract keyed cep operator, right?
>> 
>> The reason why I'm avoiding the watermarks is that it is not possible to
>> generate watermarks per key.
>> 
>> Thanks for the 'within' remark.
>> 
>> A couple of questions:
>> 
>> 1. Given our use case and the limitations of per key watermark, do you
>> think that this approach is worth adding to the library?
>> 
>> 2. What other aspects of the framework do I need to consider/test before
>> we go about implementing this approach formally?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, 
>> wrote:
>> 
>>> If you do the buffering you can emit watermark for each such batch (equal
>>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>>> library will do it for you.
>>> The within clause will work in EventTime then.
>>> 
>>> One more remark also the within clause always work for whole pattern not
>>> just to a part of it, it does not matter if you apply it in the middle (as
>>> you did) or at the very end.
>>> 
>>> Best,
>>> Dawid
>>> 
 On 19 Mar 2018, at 11:31, Shailesh Jain 
>>> wrote:
 
 Thanks for your reply, Dawid.
 
 I understand that the approach I've tried out is not generic enough, and
 would need a lot more thought to be put into w.r.t parallelism
 considerations, out of order events, effects on downstream operators
>>> etc.
 The intention was to do a quick implementation to check the feasibility
>>> of
 the approach.
 
>> It will also not sort the events etc.
 
 In the application code to test this approach, I had used a Global
>>> window
 to sort events based on their timestamp (similar to how out of order
>>> events
 are dropped based on a time-bound, I'm dropping them based on a count
>>> based
 bound).
 
 allEvents = allEvents
   .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
   .window(GlobalWindows.create())
   .trigger(new GlobalWindowCountTrigger(
>>> propLoader.getSortWindowSize()))
   .process(new SortWindowProcessFunction())
   .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
   .assignTimestampsAndWatermarks(new TimestampsExtractor())
   .uid(Constants.TS_EX_UID);
 PatternLoader
   .applyPatterns(allEvents, propLoader.getPatternClassNames())
   .addSink(createKafkaSink(kafkaProps))
   .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
 
 
>> If in the getCurrentWatermark method of your
 AssignerWithPeriodicWatermarks you will just return
>> new Watermark(System.currentTimeMillis()), you will get the same
 behaviour as with that change,
>> am I right?
 
 If watermarks are generated based on the machine time, the major issue I
 see is that we will not be able to leverage Event Time functionality.
 Specifically, if I have patterns which look for the absence of an Event
>>> for
 a fixed period of time.
 
 For eg. We have many such patterns:
 
 Pattern pattern = Pattern.begin
   (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
   .skipPastLastEvent())
   .where(Conditions.getUnderchilledCondition())
   .notFollowedBy(COMPRESSOR_ON)
   .where(Conditions.getCompressorOnCondition())
   .within(Time.minutes(30))
   .followedBy(HIGH_TEMP)
   .where(Conditions.getHighTemperatureCondition());
 
 Now when there are network issues (which are very frequent), queued
>>> events
 are delivered together, and such patterns will not be matched correctly
>>> as
 pruning of events from NFA's buffer will not be done 

Re: [Proposal] CEP library changes - review request

2018-04-03 Thread Shailesh Jain
Bump.

On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain 
wrote:

> To trigger the computations for each batch, I'll have to use the
> processing time timer in the abstract keyed cep operator, right?
>
> The reason why I'm avoiding the watermarks is that it is not possible to
> generate watermarks per key.
>
> Thanks for the 'within' remark.
>
> A couple of questions:
>
> 1. Given our use case and the limitations of per key watermark, do you
> think that this approach is worth adding to the library?
>
> 2. What other aspects of the framework do I need to consider/test before
> we go about implementing this approach formally?
>
> Thanks,
> Shailesh
>
>
> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, 
> wrote:
>
>> If you do the buffering you can emit watermark for each such batch (equal
>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>> library will do it for you.
>> The within clause will work in EventTime then.
>>
>> One more remark also the within clause always work for whole pattern not
>> just to a part of it, it does not matter if you apply it in the middle (as
>> you did) or at the very end.
>>
>> Best,
>> Dawid
>>
>> > On 19 Mar 2018, at 11:31, Shailesh Jain 
>> wrote:
>> >
>> > Thanks for your reply, Dawid.
>> >
>> > I understand that the approach I've tried out is not generic enough, and
>> > would need a lot more thought to be put into w.r.t parallelism
>> > considerations, out of order events, effects on downstream operators
>> etc.
>> > The intention was to do a quick implementation to check the feasibility
>> of
>> > the approach.
>> >
>> >>> It will also not sort the events etc.
>> >
>> > In the application code to test this approach, I had used a Global
>> window
>> > to sort events based on their timestamp (similar to how out of order
>> events
>> > are dropped based on a time-bound, I'm dropping them based on a count
>> based
>> > bound).
>> >
>> > allEvents = allEvents
>> >.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >.window(GlobalWindows.create())
>> >.trigger(new GlobalWindowCountTrigger(
>> propLoader.getSortWindowSize()))
>> >.process(new SortWindowProcessFunction())
>> >.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >.assignTimestampsAndWatermarks(new TimestampsExtractor())
>> >.uid(Constants.TS_EX_UID);
>> > PatternLoader
>> >.applyPatterns(allEvents, propLoader.getPatternClassNames())
>> >.addSink(createKafkaSink(kafkaProps))
>> >.uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
>> >
>> >
>> >>> If in the getCurrentWatermark method of your
>> > AssignerWithPeriodicWatermarks you will just return
>> >>> new Watermark(System.currentTimeMillis()), you will get the same
>> > behaviour as with that change,
>> >>> am I right?
>> >
>> > If watermarks are generated based on the machine time, the major issue I
>> > see is that we will not be able to leverage Event Time functionality.
>> > Specifically, if I have patterns which look for the absence of an Event
>> for
>> > a fixed period of time.
>> >
>> > For eg. We have many such patterns:
>> >
>> > Pattern pattern = Pattern.begin
>> >(UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>> >.skipPastLastEvent())
>> >.where(Conditions.getUnderchilledCondition())
>> >.notFollowedBy(COMPRESSOR_ON)
>> >.where(Conditions.getCompressorOnCondition())
>> >.within(Time.minutes(30))
>> >.followedBy(HIGH_TEMP)
>> >.where(Conditions.getHighTemperatureCondition());
>> >
>> > Now when there are network issues (which are very frequent), queued
>> events
>> > are delivered together, and such patterns will not be matched correctly
>> as
>> > pruning of events from NFA's buffer will not be done based on the
>> timestamp
>> > within the event, but on the watermark received by the operator.
>> >
>> > Is my understanding here correct?
>> >
>> > Thanks,
>> > Shailesh
>> >
>> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
>> > wysakowicz.da...@gmail.com> wrote:
>> >
>> >> Hi Shailesh,
>> >>
>> >> Thanks for your interest in the CEP library and sorry for late
>> response. I
>> >> must say I am not fun of this approach.
>> >> After this change, the Processing time is no longer a processing time,
>> >> plus it will work differently in any other place of Flink. It will
>> also not
>> >> sort the events etc.
>> >> Moreover I think you could achieve pretty similar solution if you
>> generate
>> >> your watermark based on the machine time. If in the getCurrentWatermark
>> >> method
>> >> of your AssignerWithPeriodicWatermarks you will just return new
>> >> Watermark(System.currentTimeMillis()), you will get the same
>> behaviour as
>> >> with that change, am I right?
>> >>
>> >> Best,
>> >> Dawid
>> >>
>> >>> On 18 Mar 2018, at 09:00, Shailesh Jain 

Re: [Proposal] CEP library changes - review request

2018-03-22 Thread Shailesh Jain
To trigger the computations for each batch, I'll have to use the processing
time timer in the abstract keyed cep operator, right?

The reason why I'm avoiding the watermarks is that it is not possible to
generate watermarks per key.

Thanks for the 'within' remark.

A couple of questions:

1. Given our use case and the limitations of per key watermark, do you
think that this approach is worth adding to the library?

2. What other aspects of the framework do I need to consider/test before we
go about implementing this approach formally?

Thanks,
Shailesh

On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, 
wrote:

> If you do the buffering you can emit watermark for each such batch (equal
> to highest timestamp in such batch). This way you won’t need to sort. CEP
> library will do it for you.
> The within clause will work in EventTime then.
>
> One more remark also the within clause always work for whole pattern not
> just to a part of it, it does not matter if you apply it in the middle (as
> you did) or at the very end.
>
> Best,
> Dawid
>
> > On 19 Mar 2018, at 11:31, Shailesh Jain 
> wrote:
> >
> > Thanks for your reply, Dawid.
> >
> > I understand that the approach I've tried out is not generic enough, and
> > would need a lot more thought to be put into w.r.t parallelism
> > considerations, out of order events, effects on downstream operators etc.
> > The intention was to do a quick implementation to check the feasibility
> of
> > the approach.
> >
> >>> It will also not sort the events etc.
> >
> > In the application code to test this approach, I had used a Global window
> > to sort events based on their timestamp (similar to how out of order
> events
> > are dropped based on a time-bound, I'm dropping them based on a count
> based
> > bound).
> >
> > allEvents = allEvents
> >.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >.window(GlobalWindows.create())
> >.trigger(new
> GlobalWindowCountTrigger(propLoader.getSortWindowSize()))
> >.process(new SortWindowProcessFunction())
> >.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >.assignTimestampsAndWatermarks(new TimestampsExtractor())
> >.uid(Constants.TS_EX_UID);
> > PatternLoader
> >.applyPatterns(allEvents, propLoader.getPatternClassNames())
> >.addSink(createKafkaSink(kafkaProps))
> >.uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> >
> >
> >>> If in the getCurrentWatermark method of your
> > AssignerWithPeriodicWatermarks you will just return
> >>> new Watermark(System.currentTimeMillis()), you will get the same
> > behaviour as with that change,
> >>> am I right?
> >
> > If watermarks are generated based on the machine time, the major issue I
> > see is that we will not be able to leverage Event Time functionality.
> > Specifically, if I have patterns which look for the absence of an Event
> for
> > a fixed period of time.
> >
> > For eg. We have many such patterns:
> >
> > Pattern pattern = Pattern.begin
> >(UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
> >.skipPastLastEvent())
> >.where(Conditions.getUnderchilledCondition())
> >.notFollowedBy(COMPRESSOR_ON)
> >.where(Conditions.getCompressorOnCondition())
> >.within(Time.minutes(30))
> >.followedBy(HIGH_TEMP)
> >.where(Conditions.getHighTemperatureCondition());
> >
> > Now when there are network issues (which are very frequent), queued
> events
> > are delivered together, and such patterns will not be matched correctly
> as
> > pruning of events from NFA's buffer will not be done based on the
> timestamp
> > within the event, but on the watermark received by the operator.
> >
> > Is my understanding here correct?
> >
> > Thanks,
> > Shailesh
> >
> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> > wysakowicz.da...@gmail.com> wrote:
> >
> >> Hi Shailesh,
> >>
> >> Thanks for your interest in the CEP library and sorry for late
> response. I
> >> must say I am not fun of this approach.
> >> After this change, the Processing time is no longer a processing time,
> >> plus it will work differently in any other place of Flink. It will also
> not
> >> sort the events etc.
> >> Moreover I think you could achieve pretty similar solution if you
> generate
> >> your watermark based on the machine time. If in the getCurrentWatermark
> >> method
> >> of your AssignerWithPeriodicWatermarks you will just return new
> >> Watermark(System.currentTimeMillis()), you will get the same behaviour
> as
> >> with that change, am I right?
> >>
> >> Best,
> >> Dawid
> >>
> >>> On 18 Mar 2018, at 09:00, Shailesh Jain 
> >> wrote:
> >>>
> >>> Thanks Aljoscha.
> >>>
> >>> Bump.
> >>>
> >>> I understand everyone would be busy with 1.5.0, but would really
> >> appreciate
> >>> slight help in unblocking us here.
> >>>
> >>> Thanks,
> >>> Shailesh
> >>>
> 

Re: [Proposal] CEP library changes - review request

2018-03-22 Thread Dawid Wysakowicz
If you do the buffering you can emit watermark for each such batch (equal to 
highest timestamp in such batch). This way you won’t need to sort. CEP library 
will do it for you.
The within clause will work in EventTime then.

One more remark also the within clause always work for whole pattern not just 
to a part of it, it does not matter if you apply it in the middle (as you did) 
or at the very end.

Best,
Dawid

> On 19 Mar 2018, at 11:31, Shailesh Jain  wrote:
> 
> Thanks for your reply, Dawid.
> 
> I understand that the approach I've tried out is not generic enough, and
> would need a lot more thought to be put into w.r.t parallelism
> considerations, out of order events, effects on downstream operators etc.
> The intention was to do a quick implementation to check the feasibility of
> the approach.
> 
>>> It will also not sort the events etc.
> 
> In the application code to test this approach, I had used a Global window
> to sort events based on their timestamp (similar to how out of order events
> are dropped based on a time-bound, I'm dropping them based on a count based
> bound).
> 
> allEvents = allEvents
>.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>.window(GlobalWindows.create())
>.trigger(new GlobalWindowCountTrigger(propLoader.getSortWindowSize()))
>.process(new SortWindowProcessFunction())
>.keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>.assignTimestampsAndWatermarks(new TimestampsExtractor())
>.uid(Constants.TS_EX_UID);
> PatternLoader
>.applyPatterns(allEvents, propLoader.getPatternClassNames())
>.addSink(createKafkaSink(kafkaProps))
>.uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> 
> 
>>> If in the getCurrentWatermark method of your
> AssignerWithPeriodicWatermarks you will just return
>>> new Watermark(System.currentTimeMillis()), you will get the same
> behaviour as with that change,
>>> am I right?
> 
> If watermarks are generated based on the machine time, the major issue I
> see is that we will not be able to leverage Event Time functionality.
> Specifically, if I have patterns which look for the absence of an Event for
> a fixed period of time.
> 
> For eg. We have many such patterns:
> 
> Pattern pattern = Pattern.begin
>(UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>.skipPastLastEvent())
>.where(Conditions.getUnderchilledCondition())
>.notFollowedBy(COMPRESSOR_ON)
>.where(Conditions.getCompressorOnCondition())
>.within(Time.minutes(30))
>.followedBy(HIGH_TEMP)
>.where(Conditions.getHighTemperatureCondition());
> 
> Now when there are network issues (which are very frequent), queued events
> are delivered together, and such patterns will not be matched correctly as
> pruning of events from NFA's buffer will not be done based on the timestamp
> within the event, but on the watermark received by the operator.
> 
> Is my understanding here correct?
> 
> Thanks,
> Shailesh
> 
> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
> 
>> Hi Shailesh,
>> 
>> Thanks for your interest in the CEP library and sorry for late response. I
>> must say I am not fun of this approach.
>> After this change, the Processing time is no longer a processing time,
>> plus it will work differently in any other place of Flink. It will also not
>> sort the events etc.
>> Moreover I think you could achieve pretty similar solution if you generate
>> your watermark based on the machine time. If in the getCurrentWatermark
>> method
>> of your AssignerWithPeriodicWatermarks you will just return new
>> Watermark(System.currentTimeMillis()), you will get the same behaviour as
>> with that change, am I right?
>> 
>> Best,
>> Dawid
>> 
>>> On 18 Mar 2018, at 09:00, Shailesh Jain 
>> wrote:
>>> 
>>> Thanks Aljoscha.
>>> 
>>> Bump.
>>> 
>>> I understand everyone would be busy with 1.5.0, but would really
>> appreciate
>>> slight help in unblocking us here.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek 
>>> wrote:
>>> 
 Hi,
 
 I think this should have been sent to the dev mailing list because in
>> the
 user mailing list it might disappear among a lot of other mail.
 
 Forwarding...
 
 Best,
 Aljoscha
 
> On 14. Mar 2018, at 06:20, Shailesh Jain 
 wrote:
> 
> Hi,
> 
> We've been facing issues* w.r.t watermarks not supported per key, which
 led us to:
> 
> Either (a) run the job in Processing time for a KeyedStream ->
 compromising on use cases which revolve around catching time-based
>> patterns
> or (b) run the job in Event time for multiple data streams (one data
 stream per key) -> this is not scalable as the number of operators grow
 linearly with the 

Re: [Proposal] CEP library changes - review request

2018-03-19 Thread Dawid Wysakowicz
Hi Shailesh,

Thanks for your interest in the CEP library and sorry for late response. I must 
say I am not fun of this approach.
After this change, the Processing time is no longer a processing time, plus it 
will work differently in any other place of Flink. It will also not sort the 
events etc.
Moreover I think you could achieve pretty similar solution if you generate your 
watermark based on the machine time. If in the getCurrentWatermark method
of your AssignerWithPeriodicWatermarks you will just return new 
Watermark(System.currentTimeMillis()), you will get the same behaviour as with 
that change, am I right?

Best,
Dawid

> On 18 Mar 2018, at 09:00, Shailesh Jain  wrote:
> 
> Thanks Aljoscha.
> 
> Bump.
> 
> I understand everyone would be busy with 1.5.0, but would really appreciate
> slight help in unblocking us here.
> 
> Thanks,
> Shailesh
> 
> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek 
> wrote:
> 
>> Hi,
>> 
>> I think this should have been sent to the dev mailing list because in the
>> user mailing list it might disappear among a lot of other mail.
>> 
>> Forwarding...
>> 
>> Best,
>> Aljoscha
>> 
>>> On 14. Mar 2018, at 06:20, Shailesh Jain 
>> wrote:
>>> 
>>> Hi,
>>> 
>>> We've been facing issues* w.r.t watermarks not supported per key, which
>> led us to:
>>> 
>>> Either (a) run the job in Processing time for a KeyedStream ->
>> compromising on use cases which revolve around catching time-based patterns
>>> or (b) run the job in Event time for multiple data streams (one data
>> stream per key) -> this is not scalable as the number of operators grow
>> linearly with the number of keys
>>> 
>>> To address this, we've done a quick (poc) change in the
>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
>> on timestamps extracted from the events arriving into the operator (and not
>> from the watermarks). We've tested it against our usecase and are seeing a
>> significant improvement in memory usage without compromising on the
>> watermark functionality.
>>> 
>>> It'll be really helpful if someone from the cep dev group can take a
>> look at this branch - https://github.com/jainshailesh/flink/commits/
>> cep_changes 
>> and provide comments on the approach taken, and maybe guide us on the next
>> steps for taking it forward.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> * Links to previous email threads related to the same issue:
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Question-on-event-time-functionality-
>> using-Flink-in-a-IoT-usecase-td18653.html > mailing-list-archive.2336050.n4.nabble.com/Question-on-
>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html <
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html>
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Correlation-between-number-of-operators-
>> and-Job-manager-memory-requirements-td18384.html <
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Correlation-between-number-of-operators-
>> and-Job-manager-memory-requirements-td18384.html>
>>> 
>> 
>> 



signature.asc
Description: Message signed with OpenPGP


Re: [Proposal] CEP library changes - review request

2018-03-18 Thread Shailesh Jain
Thanks Aljoscha.

Bump.

I understand everyone would be busy with 1.5.0, but would really appreciate
slight help in unblocking us here.

Thanks,
Shailesh

On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think this should have been sent to the dev mailing list because in the
> user mailing list it might disappear among a lot of other mail.
>
> Forwarding...
>
> Best,
> Aljoscha
>
> > On 14. Mar 2018, at 06:20, Shailesh Jain 
> wrote:
> >
> > Hi,
> >
> > We've been facing issues* w.r.t watermarks not supported per key, which
> led us to:
> >
> > Either (a) run the job in Processing time for a KeyedStream ->
> compromising on use cases which revolve around catching time-based patterns
> > or (b) run the job in Event time for multiple data streams (one data
> stream per key) -> this is not scalable as the number of operators grow
> linearly with the number of keys
> >
> > To address this, we've done a quick (poc) change in the
> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
> on timestamps extracted from the events arriving into the operator (and not
> from the watermarks). We've tested it against our usecase and are seeing a
> significant improvement in memory usage without compromising on the
> watermark functionality.
> >
> > It'll be really helpful if someone from the cep dev group can take a
> look at this branch - https://github.com/jainshailesh/flink/commits/
> cep_changes 
> and provide comments on the approach taken, and maybe guide us on the next
> steps for taking it forward.
> >
> > Thanks,
> > Shailesh
> >
> > * Links to previous email threads related to the same issue:
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Question-on-event-time-functionality-
> using-Flink-in-a-IoT-usecase-td18653.html  mailing-list-archive.2336050.n4.nabble.com/Question-on-
> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html <
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html>
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Correlation-between-number-of-operators-
> and-Job-manager-memory-requirements-td18384.html <
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Correlation-between-number-of-operators-
> and-Job-manager-memory-requirements-td18384.html>
> >
>
>


Re: [Proposal] CEP library changes - review request

2018-03-14 Thread Aljoscha Krettek
Hi,

I think this should have been sent to the dev mailing list because in the user 
mailing list it might disappear among a lot of other mail.

Forwarding...

Best,
Aljoscha

> On 14. Mar 2018, at 06:20, Shailesh Jain  wrote:
> 
> Hi,
> 
> We've been facing issues* w.r.t watermarks not supported per key, which led 
> us to:
> 
> Either (a) run the job in Processing time for a KeyedStream -> compromising 
> on use cases which revolve around catching time-based patterns
> or (b) run the job in Event time for multiple data streams (one data stream 
> per key) -> this is not scalable as the number of operators grow linearly 
> with the number of keys
> 
> To address this, we've done a quick (poc) change in the 
> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based on 
> timestamps extracted from the events arriving into the operator (and not from 
> the watermarks). We've tested it against our usecase and are seeing a 
> significant improvement in memory usage without compromising on the watermark 
> functionality.
> 
> It'll be really helpful if someone from the cep dev group can take a look at 
> this branch - https://github.com/jainshailesh/flink/commits/cep_changes 
>  and provide 
> comments on the approach taken, and maybe guide us on the next steps for 
> taking it forward. 
> 
> Thanks,
> Shailesh
> 
> * Links to previous email threads related to the same issue:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-on-event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html
>  
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
>  
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Correlation-between-number-of-operators-and-Job-manager-memory-requirements-td18384.html
>  
> 
>