Re: Flink cep checkpoint size
Hi, Dawid. Thanks for replying, happy to know you are working on this. On 2021/07/08 12:14:21, Dawid Wysakowicz wrote: > Hi, > > Sorry for the late reply. > > Indeed I found a couple of problems with clearing the state for short > lived keys. I created a JIRA[1] issue to track it and opened a PR (which > needs test coverage before it can be merged) with fixes for those. > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-23314 > > On 06/07/2021 09:11, Li Jim wrote: > > Hi, Mohit, > > > > Have you figured out any solusions on this problem ? > > > > I am now facing the exactly same problem , > > > > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but > > the checkpoint size is still growing. > > > > On 2021/06/02 15:45:59, "Singh, Mohit" wrote: > >> Hi, > >> > >> I am facing an issue with cep operator where checkpoint size keeps > >> increasing even though the pattern is fully matched. I have a stream with > >> unique user id and I want to detect a pattern of product purchased by user. > >> > >> here is the sample stream data > >> > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product1","bids":3,"ts":"1622644781243"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":6,"ts":"1622644781245"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":4,"ts":"1622644781247"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":2,"ts":"1622644781247"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":1,"ts":"1622644781248"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product3","bids":1,"ts":"1622644781248"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product1","bids":3,"ts":"1622644782235"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":6,"ts":"1622644782236"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":4,"ts":"1622644782236"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":2,"ts":"1622644782237"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":1,"ts":"1622644782238"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product3","bids":1,"ts":"1622644782239"} > >> ….. > >> ….. > >> > >> StreamExecutionEnvironment env = > >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> env.setParallelism(1); > >> Properties properties = new Properties(); > >> properties.setProperty("bootstrap.servers", "localhost:9092"); > >> properties.setProperty("group.id", "cep"); > >> DataStream stream = env.addSource( > >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), > >> properties)) > >> .map(json -> gson.fromJson(json, orders.class)) > >> .assignTimestampsAndWatermarks( > >> > >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) > >> .withTimestampAssigner((orders, timestamp) -> > >> orders.ts) > >> );Pattern pattern = Pattern.begin( > >> "start", > >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new > >> SimpleCondition() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product1"); > >> } > >> }).times(1).followedBy("middle").where(new SimpleCondition() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product2"); > >> } > >> }).oneOrMore().until(new SimpleCondition() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product3"); > >> } > >> }).within(Time.seconds(10)); > >> PatternStream patternStream = > >> CEP.pattern(stream.keyBy((KeySelector) orders > >> -> orders.user_id), pattern);DataStream alerts = > >> patternStream.select((PatternSelectFunction) matches -> > >> matches.get("start").get(0).user_id + "->" + > >> matches.get("middle").get(0).ts); > >> alerts.print(); > >> > >> > >> [cid:image001.png@01D7579C.775FCA00] > >> > >> I have also attached the checkpoint file. > >> > >> It looks like the NFA state keeps track of all keys seen and the start > >> state and that leads to increase in checkpoint size if the keys are not > >> reused in patterns. So, if I have fixed number of keys the size do not > >> increase. is this the expected behavior and correct understanding? > >> Is there a way to drop these keys once the pattern is matched.? or am I > >> missing
Re: Flink cep checkpoint size
Hi, Sorry for the late reply. Indeed I found a couple of problems with clearing the state for short lived keys. I created a JIRA[1] issue to track it and opened a PR (which needs test coverage before it can be merged) with fixes for those. Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-23314 On 06/07/2021 09:11, Li Jim wrote: > Hi, Mohit, > > Have you figured out any solusions on this problem ? > > I am now facing the exactly same problem , > > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the > checkpoint size is still growing. > > On 2021/06/02 15:45:59, "Singh, Mohit" wrote: >> Hi, >> >> I am facing an issue with cep operator where checkpoint size keeps >> increasing even though the pattern is fully matched. I have a stream with >> unique user id and I want to detect a pattern of product purchased by user. >> >> here is the sample stream data >> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product1","bids":3,"ts":"1622644781243"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":6,"ts":"1622644781245"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":4,"ts":"1622644781247"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":2,"ts":"1622644781247"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":1,"ts":"1622644781248"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product3","bids":1,"ts":"1622644781248"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product1","bids":3,"ts":"1622644782235"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":6,"ts":"1622644782236"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":4,"ts":"1622644782236"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":2,"ts":"1622644782237"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":1,"ts":"1622644782238"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product3","bids":1,"ts":"1622644782239"} >> ….. >> ….. >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.setParallelism(1); >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "cep"); >> DataStream stream = env.addSource( >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), >> properties)) >> .map(json -> gson.fromJson(json, orders.class)) >> .assignTimestampsAndWatermarks( >> >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) >> .withTimestampAssigner((orders, timestamp) -> >> orders.ts) >> );Pattern pattern = Pattern.begin( >> "start", >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new >> SimpleCondition() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product1"); >> } >> }).times(1).followedBy("middle").where(new SimpleCondition() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product2"); >> } >> }).oneOrMore().until(new SimpleCondition() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product3"); >> } >> }).within(Time.seconds(10)); >> PatternStream patternStream = >> CEP.pattern(stream.keyBy((KeySelector) orders -> >> orders.user_id), pattern);DataStream alerts = >> patternStream.select((PatternSelectFunction) matches -> >> matches.get("start").get(0).user_id + "->" + >> matches.get("middle").get(0).ts); >> alerts.print(); >> >> >> [cid:image001.png@01D7579C.775FCA00] >> >> I have also attached the checkpoint file. >> >> It looks like the NFA state keeps track of all keys seen and the start state >> and that leads to increase in checkpoint size if the keys are not reused in >> patterns. So, if I have fixed number of keys the size do not increase. is >> this the expected behavior and correct understanding? >> Is there a way to drop these keys once the pattern is matched.? or am I >> missing something here? >> >> Thanks, >> Mohit >> OpenPGP_signature Description: OpenPGP digital signature
Re: Flink CEP checkpoint size
We did look into fixing it ourselves, but decided that migrating to the datastream api, not using CEP, was more fruitful overall for us. Unfortunately, I don't have a good answer for you. The bug from a non-contributors stand point appears pretty deep in the codebase, but the authors are best ones to speak to this. On Tue, Jul 6, 2021 at 7:01 PM Li Jim wrote: > > Hi, Jameson > Thanks very much for replying , I am really struggling on this. > I am using flowId as my keys, which means they will be matched and never use > again. > This seems like the scenario 2. I didn't know it is not fixed yet. > thank you again and do you have any solutions ? > > On 2021/07/07 01:47:00, Aeden Jameson wrote: > > Hi Li, > > > >How big is your keyspace? Had a similar problem which turns out to > > be scenario 2 in this issue > > https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug > > in scenario 1 got fixed by scenario 2 did not. There's more detail in > > this thread, > > http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168 > > . Hope that helps > > > > On Tue, Jul 6, 2021 at 12:44 AM Li Jim wrote: > > > > > > I am using Flink CEP to do some performance tests. > > > > > > Flink version 1.13.1. > > > > > > below is the sql: > > > > > > INSERT INTO to_kafka > > > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source > > > MATCH_RECOGNIZE > > > ( > > > PARTITION BY flow_id > > > ORDER BY proctime > > > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as > > > wdValue, MAP[ > > > A.zb_name, A.zb_value, > > > B.zb_name, B.zb_value > > > ] as zbValue, A.flow_id as flowId > > > ONE ROW PER MATCH > > > AFTER MATCH SKIP PAST LAST ROW > > > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND > > > DEFINE > > > B AS B.flow_id = A.flow_id > > > ); > > > > > > I add the 'within clause' to avoid state growing, but it does not work at > > > all. > > > > > > the checkpoint size is growing fast. > > > > > > I am using rocksdb incremental mode. > > > > > > > > > > > > -- > > Cheers, > > Aeden > >
Re: Flink CEP checkpoint size
Hi, Jameson Thanks very much for replying , I am really struggling on this. I am using flowId as my keys, which means they will be matched and never use again. This seems like the scenario 2. I didn't know it is not fixed yet. thank you again and do you have any solutions ? On 2021/07/07 01:47:00, Aeden Jameson wrote: > Hi Li, > >How big is your keyspace? Had a similar problem which turns out to > be scenario 2 in this issue > https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug > in scenario 1 got fixed by scenario 2 did not. There's more detail in > this thread, > http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168 > . Hope that helps > > On Tue, Jul 6, 2021 at 12:44 AM Li Jim wrote: > > > > I am using Flink CEP to do some performance tests. > > > > Flink version 1.13.1. > > > > below is the sql: > > > > INSERT INTO to_kafka > > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source > > MATCH_RECOGNIZE > > ( > > PARTITION BY flow_id > > ORDER BY proctime > > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as > > wdValue, MAP[ > > A.zb_name, A.zb_value, > > B.zb_name, B.zb_value > > ] as zbValue, A.flow_id as flowId > > ONE ROW PER MATCH > > AFTER MATCH SKIP PAST LAST ROW > > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND > > DEFINE > > B AS B.flow_id = A.flow_id > > ); > > > > I add the 'within clause' to avoid state growing, but it does not work at > > all. > > > > the checkpoint size is growing fast. > > > > I am using rocksdb incremental mode. > > > > > > > -- > Cheers, > Aeden >
Re: Flink CEP checkpoint size
Hi Li, How big is your keyspace? Had a similar problem which turns out to be scenario 2 in this issue https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug in scenario 1 got fixed by scenario 2 did not. There's more detail in this thread, http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168 . Hope that helps On Tue, Jul 6, 2021 at 12:44 AM Li Jim wrote: > > I am using Flink CEP to do some performance tests. > > Flink version 1.13.1. > > below is the sql: > > INSERT INTO to_kafka > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source > MATCH_RECOGNIZE > ( > PARTITION BY flow_id > ORDER BY proctime > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as > wdValue, MAP[ > A.zb_name, A.zb_value, > B.zb_name, B.zb_value > ] as zbValue, A.flow_id as flowId > ONE ROW PER MATCH > AFTER MATCH SKIP PAST LAST ROW > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND > DEFINE > B AS B.flow_id = A.flow_id > ); > > I add the 'within clause' to avoid state growing, but it does not work at all. > > the checkpoint size is growing fast. > > I am using rocksdb incremental mode. > > -- Cheers, Aeden
Flink CEP checkpoint size
I am using Flink CEP to do some performance tests. Flink version 1.13.1. below is the sql: INSERT INTO to_kafka SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source MATCH_RECOGNIZE ( PARTITION BY flow_id ORDER BY proctime MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, MAP[ A.zb_name, A.zb_value, B.zb_name, B.zb_value ] as zbValue, A.flow_id as flowId ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN ( A B ) WITHIN INTERVAL '10' SECOND DEFINE B AS B.flow_id = A.flow_id ); I add the 'within clause' to avoid state growing, but it does not work at all. the checkpoint size is growing fast. I am using rocksdb incremental mode.
Re: Flink cep checkpoint size
Hi, Mohit, Have you figured out any solusions on this problem ? I am now facing the exactly same problem , I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the checkpoint size is still growing. On 2021/06/02 15:45:59, "Singh, Mohit" wrote: > Hi, > > I am facing an issue with cep operator where checkpoint size keeps increasing > even though the pattern is fully matched. I have a stream with unique user id > and I want to detect a pattern of product purchased by user. > > here is the sample stream data > > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product1","bids":3,"ts":"1622644781243"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":6,"ts":"1622644781245"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":4,"ts":"1622644781247"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":2,"ts":"1622644781247"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":1,"ts":"1622644781248"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product3","bids":1,"ts":"1622644781248"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product1","bids":3,"ts":"1622644782235"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":6,"ts":"1622644782236"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":4,"ts":"1622644782236"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":2,"ts":"1622644782237"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":1,"ts":"1622644782238"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product3","bids":1,"ts":"1622644782239"} > ….. > ….. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(1); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "cep"); > DataStream stream = env.addSource( > new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), > properties)) > .map(json -> gson.fromJson(json, orders.class)) > .assignTimestampsAndWatermarks( > > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) > .withTimestampAssigner((orders, timestamp) -> > orders.ts) > );Pattern pattern = Pattern.begin( > "start", > AfterMatchSkipStrategy.skipPastLastEvent()).where(new > SimpleCondition() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product1"); > } > }).times(1).followedBy("middle").where(new SimpleCondition() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product2"); > } > }).oneOrMore().until(new SimpleCondition() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product3"); > } > }).within(Time.seconds(10)); > PatternStream patternStream = > CEP.pattern(stream.keyBy((KeySelector) orders -> > orders.user_id), pattern);DataStream alerts = > patternStream.select((PatternSelectFunction) matches -> > matches.get("start").get(0).user_id + "->" + > matches.get("middle").get(0).ts); > alerts.print(); > > > [cid:image001.png@01D7579C.775FCA00] > > I have also attached the checkpoint file. > > It looks like the NFA state keeps track of all keys seen and the start state > and that leads to increase in checkpoint size if the keys are not reused in > patterns. So, if I have fixed number of keys the size do not increase. is > this the expected behavior and correct understanding? > Is there a way to drop these keys once the pattern is matched.? or am I > missing something here? > > Thanks, > Mohit >