Hi, Dawid.
Thanks for replying, happy to know you are working on this.

On 2021/07/08 12:14:21, Dawid Wysakowicz <dwysakow...@apache.org> wrote: 
> Hi,
> 
> Sorry for the late reply.
> 
> Indeed I found a couple of problems with clearing the state for short
> lived keys. I created a JIRA[1] issue to track it and opened a PR (which
> needs test coverage before it can be merged) with fixes for those.
> 
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23314
> 
> On 06/07/2021 09:11, Li Jim wrote:
> > Hi, Mohit, 
> >
> > Have you figured out any solusions on this problem ?
> >
> > I am now facing the exactly same problem ,
> >
> > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but 
> > the checkpoint size is still growing.
> >
> > On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: 
> >> Hi,
> >>
> >> I am facing an issue with cep operator where checkpoint size keeps 
> >> increasing even though the pattern is fully matched. I have a stream with 
> >> unique user id and I want to detect a pattern of product purchased by user.
> >>
> >> here is the sample stream data
> >>
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product1","bids":3,"ts":"1622644781243"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":6,"ts":"1622644781245"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":4,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":2,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":1,"ts":"1622644781248"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product3","bids":1,"ts":"1622644781248"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product1","bids":3,"ts":"1622644782235"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":6,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":4,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":2,"ts":"1622644782237"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":1,"ts":"1622644782238"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product3","bids":1,"ts":"1622644782239"}
> >> …..
> >> …..
> >>
> >> StreamExecutionEnvironment env = 
> >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> env.setParallelism(1);
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "localhost:9092");
> >> properties.setProperty("group.id", "cep");
> >> DataStream<orders> stream = env.addSource(
> >>             new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> >> properties))
> >>             .map(json -> gson.fromJson(json, orders.class))
> >>             .assignTimestampsAndWatermarks(
> >>                     
> >> WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
> >>                             .withTimestampAssigner((orders, timestamp) ->  
> >> orders.ts)
> >>             );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
> >>             "start",
> >>             AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> >> SimpleCondition<orders>() {
> >>         @Override
> >>         public boolean filter(orders value) throws Exception {
> >>             return value.product.equals("product1");
> >>         }
> >>     }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
> >>         @Override
> >>         public boolean filter(orders value) throws Exception {
> >>             return value.product.equals("product2");
> >>         }
> >>     }).oneOrMore().until(new SimpleCondition<orders>() {
> >>         @Override
> >>         public boolean filter(orders value) throws Exception {
> >>             return value.product.equals("product3");
> >>         }
> >>     }).within(Time.seconds(10));
> >> PatternStream<orders> patternStream =
> >>             CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders 
> >> -> orders.user_id), pattern);    DataStream<String> alerts = 
> >> patternStream.select((PatternSelectFunction<orders, String>) matches ->
> >>             matches.get("start").get(0).user_id + "->" +
> >>             matches.get("middle").get(0).ts);
> >> alerts.print();
> >>
> >>
> >> [cid:image001.png@01D7579C.775FCA00]
> >>
> >> I have also attached the checkpoint file.
> >>
> >> It looks like the NFA state keeps track of all keys seen and the start 
> >> state and that leads to increase in checkpoint size if the keys are not 
> >> reused in patterns. So, if I have fixed number of keys the size do not 
> >> increase. is this the expected behavior and correct understanding?
> >> Is there a way to drop these keys once the pattern is matched.? or am I 
> >> missing something here?
> >>
> >> Thanks,
> >> Mohit
> >>
> 
> 

Reply via email to