Re: Flink cep checkpoint size

2021-07-08 Thread Li Jim
Hi, Dawid.
Thanks for replying, happy to know you are working on this.

On 2021/07/08 12:14:21, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Sorry for the late reply.
> 
> Indeed I found a couple of problems with clearing the state for short
> lived keys. I created a JIRA[1] issue to track it and opened a PR (which
> needs test coverage before it can be merged) with fixes for those.
> 
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23314
> 
> On 06/07/2021 09:11, Li Jim wrote:
> > Hi, Mohit, 
> >
> > Have you figured out any solusions on this problem ?
> >
> > I am now facing the exactly same problem ,
> >
> > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but 
> > the checkpoint size is still growing.
> >
> > On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
> >> Hi,
> >>
> >> I am facing an issue with cep operator where checkpoint size keeps 
> >> increasing even though the pattern is fully matched. I have a stream with 
> >> unique user id and I want to detect a pattern of product purchased by user.
> >>
> >> here is the sample stream data
> >>
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product1","bids":3,"ts":"1622644781243"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":6,"ts":"1622644781245"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":4,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":2,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":1,"ts":"1622644781248"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product3","bids":1,"ts":"1622644781248"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product1","bids":3,"ts":"1622644782235"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":6,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":4,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":2,"ts":"1622644782237"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":1,"ts":"1622644782238"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product3","bids":1,"ts":"1622644782239"}
> >> …..
> >> …..
> >>
> >> StreamExecutionEnvironment env = 
> >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> env.setParallelism(1);
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "localhost:9092");
> >> properties.setProperty("group.id", "cep");
> >> DataStream stream = env.addSource(
> >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> >> properties))
> >> .map(json -> gson.fromJson(json, orders.class))
> >> .assignTimestampsAndWatermarks(
> >> 
> >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
> >> .withTimestampAssigner((orders, timestamp) ->  
> >> orders.ts)
> >> );Pattern pattern = Pattern.begin(
> >> "start",
> >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> >> SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product1");
> >> }
> >> }).times(1).followedBy("middle").where(new SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product2");
> >> }
> >> }).oneOrMore().until(new SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product3");
> >> }
> >> }).within(Time.seconds(10));
> >> PatternStream patternStream =
> >> CEP.pattern(stream.keyBy((KeySelector) orders 
> >> -> orders.user_id), pattern);DataStream alerts = 
> >> patternStream.select((PatternSelectFunction) matches ->
> >> matches.get("start").get(0).user_id + "->" +
> >> matches.get("middle").get(0).ts);
> >> alerts.print();
> >>
> >>
> >> [cid:image001.png@01D7579C.775FCA00]
> >>
> >> I have also attached the checkpoint file.
> >>
> >> It looks like the NFA state keeps track of all keys seen and the start 
> >> state and that leads to increase in checkpoint size if the keys are not 
> >> reused in patterns. So, if I have fixed number of keys the size do not 
> >> increase. is this the expected behavior and correct understanding?
> >> Is there a way to drop these keys once the pattern is matched.? or am I 
> >> missing 

Re: Flink cep checkpoint size

2021-07-08 Thread Dawid Wysakowicz
Hi,

Sorry for the late reply.

Indeed I found a couple of problems with clearing the state for short
lived keys. I created a JIRA[1] issue to track it and opened a PR (which
needs test coverage before it can be merged) with fixes for those.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-23314

On 06/07/2021 09:11, Li Jim wrote:
> Hi, Mohit, 
>
> Have you figured out any solusions on this problem ?
>
> I am now facing the exactly same problem ,
>
> I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
> checkpoint size is still growing.
>
> On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
>> Hi,
>>
>> I am facing an issue with cep operator where checkpoint size keeps 
>> increasing even though the pattern is fully matched. I have a stream with 
>> unique user id and I want to detect a pattern of product purchased by user.
>>
>> here is the sample stream data
>>
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product1","bids":3,"ts":"1622644781243"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":6,"ts":"1622644781245"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":4,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":2,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":1,"ts":"1622644781248"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product3","bids":1,"ts":"1622644781248"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product1","bids":3,"ts":"1622644782235"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":6,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":4,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":2,"ts":"1622644782237"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":1,"ts":"1622644782238"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product3","bids":1,"ts":"1622644782239"}
>> …..
>> …..
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(1);
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "cep");
>> DataStream stream = env.addSource(
>> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
>> properties))
>> .map(json -> gson.fromJson(json, orders.class))
>> .assignTimestampsAndWatermarks(
>> 
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
>> .withTimestampAssigner((orders, timestamp) ->  
>> orders.ts)
>> );Pattern pattern = Pattern.begin(
>> "start",
>> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
>> SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product1");
>> }
>> }).times(1).followedBy("middle").where(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product2");
>> }
>> }).oneOrMore().until(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product3");
>> }
>> }).within(Time.seconds(10));
>> PatternStream patternStream =
>> CEP.pattern(stream.keyBy((KeySelector) orders -> 
>> orders.user_id), pattern);DataStream alerts = 
>> patternStream.select((PatternSelectFunction) matches ->
>> matches.get("start").get(0).user_id + "->" +
>> matches.get("middle").get(0).ts);
>> alerts.print();
>>
>>
>> [cid:image001.png@01D7579C.775FCA00]
>>
>> I have also attached the checkpoint file.
>>
>> It looks like the NFA state keeps track of all keys seen and the start state 
>> and that leads to increase in checkpoint size if the keys are not reused in 
>> patterns. So, if I have fixed number of keys the size do not increase. is 
>> this the expected behavior and correct understanding?
>> Is there a way to drop these keys once the pattern is matched.? or am I 
>> missing something here?
>>
>> Thanks,
>> Mohit
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink CEP checkpoint size

2021-07-07 Thread Aeden Jameson
We did look into fixing it ourselves, but decided that migrating to
the datastream api, not using CEP, was more fruitful overall for us.
Unfortunately, I don't have a good answer for you. The bug from a
non-contributors stand point appears pretty deep in the codebase, but
the authors are best ones to speak to this.

On Tue, Jul 6, 2021 at 7:01 PM Li Jim  wrote:
>
> Hi, Jameson
> Thanks very much for replying , I am really struggling on this.
> I am using flowId as my keys, which means they will be matched and never use 
> again.
> This seems like the scenario 2. I didn't know it is not fixed yet.
> thank you again and do you have any solutions ?
>
> On 2021/07/07 01:47:00, Aeden Jameson  wrote:
> > Hi Li,
> >
> >How big is your keyspace? Had a similar problem which turns out to
> > be scenario 2 in this issue
> > https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug
> > in scenario 1 got fixed by scenario 2 did not.  There's more detail in
> > this thread, 
> > http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168
> > . Hope that helps
> >
> > On Tue, Jul 6, 2021 at 12:44 AM Li Jim  wrote:
> > >
> > > I am using Flink CEP to do some performance tests.
> > >
> > > Flink version 1.13.1.
> > >
> > > below is the sql:
> > >
> > > INSERT INTO to_kafka
> > > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
> > > MATCH_RECOGNIZE
> > > (
> > > PARTITION BY flow_id
> > > ORDER BY proctime
> > > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as 
> > > wdValue, MAP[
> > > A.zb_name, A.zb_value,
> > > B.zb_name, B.zb_value
> > > ] as zbValue, A.flow_id as flowId
> > > ONE ROW PER MATCH
> > > AFTER MATCH SKIP PAST LAST ROW
> > > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
> > > DEFINE
> > > B AS B.flow_id = A.flow_id
> > > );
> > >
> > > I add the 'within clause' to avoid state growing, but it does not work at 
> > > all.
> > >
> > > the checkpoint size is growing fast.
> > >
> > > I am using rocksdb incremental mode.
> > >
> > >
> >
> >
> > --
> > Cheers,
> > Aeden
> >


Re: Flink CEP checkpoint size

2021-07-06 Thread Li Jim
Hi, Jameson
Thanks very much for replying , I am really struggling on this.
I am using flowId as my keys, which means they will be matched and never use 
again.
This seems like the scenario 2. I didn't know it is not fixed yet.
thank you again and do you have any solutions ?

On 2021/07/07 01:47:00, Aeden Jameson  wrote: 
> Hi Li,
> 
>How big is your keyspace? Had a similar problem which turns out to
> be scenario 2 in this issue
> https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug
> in scenario 1 got fixed by scenario 2 did not.  There's more detail in
> this thread, 
> http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168
> . Hope that helps
> 
> On Tue, Jul 6, 2021 at 12:44 AM Li Jim  wrote:
> >
> > I am using Flink CEP to do some performance tests.
> >
> > Flink version 1.13.1.
> >
> > below is the sql:
> >
> > INSERT INTO to_kafka
> > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
> > MATCH_RECOGNIZE
> > (
> > PARTITION BY flow_id
> > ORDER BY proctime
> > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as 
> > wdValue, MAP[
> > A.zb_name, A.zb_value,
> > B.zb_name, B.zb_value
> > ] as zbValue, A.flow_id as flowId
> > ONE ROW PER MATCH
> > AFTER MATCH SKIP PAST LAST ROW
> > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
> > DEFINE
> > B AS B.flow_id = A.flow_id
> > );
> >
> > I add the 'within clause' to avoid state growing, but it does not work at 
> > all.
> >
> > the checkpoint size is growing fast.
> >
> > I am using rocksdb incremental mode.
> >
> >
> 
> 
> -- 
> Cheers,
> Aeden
> 


Re: Flink CEP checkpoint size

2021-07-06 Thread Aeden Jameson
Hi Li,

   How big is your keyspace? Had a similar problem which turns out to
be scenario 2 in this issue
https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug
in scenario 1 got fixed by scenario 2 did not.  There's more detail in
this thread, 
http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168
. Hope that helps

On Tue, Jul 6, 2021 at 12:44 AM Li Jim  wrote:
>
> I am using Flink CEP to do some performance tests.
>
> Flink version 1.13.1.
>
> below is the sql:
>
> INSERT INTO to_kafka
> SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
> MATCH_RECOGNIZE
> (
> PARTITION BY flow_id
> ORDER BY proctime
> MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as 
> wdValue, MAP[
> A.zb_name, A.zb_value,
> B.zb_name, B.zb_value
> ] as zbValue, A.flow_id as flowId
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
> DEFINE
> B AS B.flow_id = A.flow_id
> );
>
> I add the 'within clause' to avoid state growing, but it does not work at all.
>
> the checkpoint size is growing fast.
>
> I am using rocksdb incremental mode.
>
>


-- 
Cheers,
Aeden


Flink CEP checkpoint size

2021-07-06 Thread Li Jim
I am using Flink CEP to do some performance tests.

Flink version 1.13.1.

below is the sql:

INSERT INTO to_kafka
SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
MATCH_RECOGNIZE
(
PARTITION BY flow_id
ORDER BY proctime
MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, 
MAP[
A.zb_name, A.zb_value, 
B.zb_name, B.zb_value
] as zbValue, A.flow_id as flowId
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
DEFINE
B AS B.flow_id = A.flow_id
);

I add the 'within clause' to avoid state growing, but it does not work at all.

the checkpoint size is growing fast.

I am using rocksdb incremental mode.




Re: Flink cep checkpoint size

2021-07-06 Thread Li Jim
Hi, Mohit, 

Have you figured out any solusions on this problem ?

I am now facing the exactly same problem ,

I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
checkpoint size is still growing.

On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
> Hi,
> 
> I am facing an issue with cep operator where checkpoint size keeps increasing 
> even though the pattern is fully matched. I have a stream with unique user id 
> and I want to detect a pattern of product purchased by user.
> 
> here is the sample stream data
> 
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product1","bids":3,"ts":"1622644781243"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":6,"ts":"1622644781245"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":4,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":2,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":1,"ts":"1622644781248"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product3","bids":1,"ts":"1622644781248"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product1","bids":3,"ts":"1622644782235"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":6,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":4,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":2,"ts":"1622644782237"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":1,"ts":"1622644782238"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product3","bids":1,"ts":"1622644782239"}
> …..
> …..
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "cep");
> DataStream stream = env.addSource(
> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> properties))
> .map(json -> gson.fromJson(json, orders.class))
> .assignTimestampsAndWatermarks(
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
> .withTimestampAssigner((orders, timestamp) ->  
> orders.ts)
> );Pattern pattern = Pattern.begin(
> "start",
> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product1");
> }
> }).times(1).followedBy("middle").where(new SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product2");
> }
> }).oneOrMore().until(new SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product3");
> }
> }).within(Time.seconds(10));
> PatternStream patternStream =
> CEP.pattern(stream.keyBy((KeySelector) orders -> 
> orders.user_id), pattern);DataStream alerts = 
> patternStream.select((PatternSelectFunction) matches ->
> matches.get("start").get(0).user_id + "->" +
> matches.get("middle").get(0).ts);
> alerts.print();
> 
> 
> [cid:image001.png@01D7579C.775FCA00]
> 
> I have also attached the checkpoint file.
> 
> It looks like the NFA state keeps track of all keys seen and the start state 
> and that leads to increase in checkpoint size if the keys are not reused in 
> patterns. So, if I have fixed number of keys the size do not increase. is 
> this the expected behavior and correct understanding?
> Is there a way to drop these keys once the pattern is matched.? or am I 
> missing something here?
> 
> Thanks,
> Mohit
>