Hi, Mohit, 

Have you figured out any solusions on this problem ?

I am now facing the exactly same problem ,

I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
checkpoint size is still growing.

On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: 
> Hi,
> 
> I am facing an issue with cep operator where checkpoint size keeps increasing 
> even though the pattern is fully matched. I have a stream with unique user id 
> and I want to detect a pattern of product purchased by user.
> 
> here is the sample stream data
> 
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product1","bids":3,"ts":"1622644781243"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":6,"ts":"1622644781245"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":4,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":2,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":1,"ts":"1622644781248"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product3","bids":1,"ts":"1622644781248"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product1","bids":3,"ts":"1622644782235"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":6,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":4,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":2,"ts":"1622644782237"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":1,"ts":"1622644782238"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product3","bids":1,"ts":"1622644782239"}
> …..
> …..
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "cep");
> DataStream<orders> stream = env.addSource(
>             new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> properties))
>             .map(json -> gson.fromJson(json, orders.class))
>             .assignTimestampsAndWatermarks(
>                     
> WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
>                             .withTimestampAssigner((orders, timestamp) ->  
> orders.ts)
>             );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
>             "start",
>             AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> SimpleCondition<orders>() {
>         @Override
>         public boolean filter(orders value) throws Exception {
>             return value.product.equals("product1");
>         }
>     }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
>         @Override
>         public boolean filter(orders value) throws Exception {
>             return value.product.equals("product2");
>         }
>     }).oneOrMore().until(new SimpleCondition<orders>() {
>         @Override
>         public boolean filter(orders value) throws Exception {
>             return value.product.equals("product3");
>         }
>     }).within(Time.seconds(10));
> PatternStream<orders> patternStream =
>             CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> 
> orders.user_id), pattern);    DataStream<String> alerts = 
> patternStream.select((PatternSelectFunction<orders, String>) matches ->
>             matches.get("start").get(0).user_id + "->" +
>             matches.get("middle").get(0).ts);
> alerts.print();
> 
> 
> [cid:image001.png@01D7579C.775FCA00]
> 
> I have also attached the checkpoint file.
> 
> It looks like the NFA state keeps track of all keys seen and the start state 
> and that leads to increase in checkpoint size if the keys are not reused in 
> patterns. So, if I have fixed number of keys the size do not increase. is 
> this the expected behavior and correct understanding?
> Is there a way to drop these keys once the pattern is matched.? or am I 
> missing something here?
> 
> Thanks,
> Mohit
> 

Reply via email to