Re: How to check rocksdb log

2021-08-11 Thread Li Jim
Hi, Fabian
Thanks for your replay, it helps.
in 1.13, [state.backend.rocksdb.log.dir] is deleted and I use 
[state.backend.rocksdb.localdir].
It works fine.

On 2021/08/11 19:07:28, Fabian Paul  wrote: 
> Hi Li,
> 
> Flink has disabled the RocksDb logs because sizing problems but you can have 
> a look at this link [1] on how to enable them and setting the log 
> directory.
> Let me know if that answers your question.
> 
> Best,
> Fabian
> 
> 
> [1] 
> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting


How to check rocksdb log

2021-08-11 Thread Li Jim
Hi, everyone,
I have a problem with checking rocksdb's log.
I set "state.backend.rocksdb.log.level" to INFO_LEVEL
but I can't find the rocksdb's log anywhere? 
Where can I set the log dir or where should I check by default?
Thanks for any replys


Flink RocksDB Performance

2021-07-16 Thread Li Jim
Hello everyone,
I am using Flink 1.13.1 CEP Library and doing some pressure test.
My message rate is about 16000 records per second.
I find that it cant process more than 16000 records per second because the CPU 
cost is up to 100%(say 800% because I allocated 8 vcores to a taskmanager).
I tried switch to filesystem mode, it gtt faster and cpu cost goes low.
I understand this may because of serialization/deserialization cost in rocksdb, 
but in some reason we must use rocksdb as state backend.
Any suggestion to optimize this issue?






Re: Flink cep checkpoint size

2021-07-08 Thread Li Jim
Hi, Dawid.
Thanks for replying, happy to know you are working on this.

On 2021/07/08 12:14:21, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Sorry for the late reply.
> 
> Indeed I found a couple of problems with clearing the state for short
> lived keys. I created a JIRA[1] issue to track it and opened a PR (which
> needs test coverage before it can be merged) with fixes for those.
> 
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23314
> 
> On 06/07/2021 09:11, Li Jim wrote:
> > Hi, Mohit, 
> >
> > Have you figured out any solusions on this problem ?
> >
> > I am now facing the exactly same problem ,
> >
> > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but 
> > the checkpoint size is still growing.
> >
> > On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
> >> Hi,
> >>
> >> I am facing an issue with cep operator where checkpoint size keeps 
> >> increasing even though the pattern is fully matched. I have a stream with 
> >> unique user id and I want to detect a pattern of product purchased by user.
> >>
> >> here is the sample stream data
> >>
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product1","bids":3,"ts":"1622644781243"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":6,"ts":"1622644781245"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":4,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":2,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":1,"ts":"1622644781248"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product3","bids":1,"ts":"1622644781248"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product1","bids":3,"ts":"1622644782235"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":6,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":4,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":2,"ts":"1622644782237"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":1,"ts":"1622644782238"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product3","bids":1,"ts":"1622644782239"}
> >> …..
> >> …..
> >>
> >> StreamExecutionEnvironment env = 
> >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> env.setParallelism(1);
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "localhost:9092");
> >> properties.setProperty("group.id", "cep");
> >> DataStream stream = env.addSource(
> >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> >> properties))
> >> .map(json -> gson.fromJson(json, orders.class))
> >> .assignTimestampsAndWatermarks(
> >> 
> >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
> >> .withTimestampAssigner((orders, timestamp) ->  
> >> orders.ts)
> >> );Pattern pattern = Pattern.begin(
> >> "start",
> >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> >> SimpleCond

Re: Flink CEP checkpoint size

2021-07-06 Thread Li Jim
Hi, Jameson
Thanks very much for replying , I am really struggling on this.
I am using flowId as my keys, which means they will be matched and never use 
again.
This seems like the scenario 2. I didn't know it is not fixed yet.
thank you again and do you have any solutions ?

On 2021/07/07 01:47:00, Aeden Jameson  wrote: 
> Hi Li,
> 
>How big is your keyspace? Had a similar problem which turns out to
> be scenario 2 in this issue
> https://issues.apache.org/jira/browse/FLINK-19970. Looks like the bug
> in scenario 1 got fixed by scenario 2 did not.  There's more detail in
> this thread, 
> http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-cep-checkpoint-size-td44141.html#a44168
> . Hope that helps
> 
> On Tue, Jul 6, 2021 at 12:44 AM Li Jim  wrote:
> >
> > I am using Flink CEP to do some performance tests.
> >
> > Flink version 1.13.1.
> >
> > below is the sql:
> >
> > INSERT INTO to_kafka
> > SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
> > MATCH_RECOGNIZE
> > (
> > PARTITION BY flow_id
> > ORDER BY proctime
> > MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as 
> > wdValue, MAP[
> > A.zb_name, A.zb_value,
> > B.zb_name, B.zb_value
> > ] as zbValue, A.flow_id as flowId
> > ONE ROW PER MATCH
> > AFTER MATCH SKIP PAST LAST ROW
> > PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
> > DEFINE
> > B AS B.flow_id = A.flow_id
> > );
> >
> > I add the 'within clause' to avoid state growing, but it does not work at 
> > all.
> >
> > the checkpoint size is growing fast.
> >
> > I am using rocksdb incremental mode.
> >
> >
> 
> 
> -- 
> Cheers,
> Aeden
> 


Flink CEP checkpoint size

2021-07-06 Thread Li Jim
I am using Flink CEP to do some performance tests.

Flink version 1.13.1.

below is the sql:

INSERT INTO to_kafka
SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
MATCH_RECOGNIZE
(
PARTITION BY flow_id
ORDER BY proctime
MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, 
MAP[
A.zb_name, A.zb_value, 
B.zb_name, B.zb_value
] as zbValue, A.flow_id as flowId
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
DEFINE
B AS B.flow_id = A.flow_id
);

I add the 'within clause' to avoid state growing, but it does not work at all.

the checkpoint size is growing fast.

I am using rocksdb incremental mode.




Re: Flink cep checkpoint size

2021-07-06 Thread Li Jim
Hi, Mohit, 

Have you figured out any solusions on this problem ?

I am now facing the exactly same problem ,

I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
checkpoint size is still growing.

On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
> Hi,
> 
> I am facing an issue with cep operator where checkpoint size keeps increasing 
> even though the pattern is fully matched. I have a stream with unique user id 
> and I want to detect a pattern of product purchased by user.
> 
> here is the sample stream data
> 
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product1","bids":3,"ts":"1622644781243"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":6,"ts":"1622644781245"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":4,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":2,"ts":"1622644781247"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product2","bids":1,"ts":"1622644781248"}
> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> ","product":"product3","bids":1,"ts":"1622644781248"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product1","bids":3,"ts":"1622644782235"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":6,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":4,"ts":"1622644782236"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":2,"ts":"1622644782237"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product2","bids":1,"ts":"1622644782238"}
> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> ","product":"product3","bids":1,"ts":"1622644782239"}
> …..
> …..
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "cep");
> DataStream stream = env.addSource(
> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> properties))
> .map(json -> gson.fromJson(json, orders.class))
> .assignTimestampsAndWatermarks(
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
> .withTimestampAssigner((orders, timestamp) ->  
> orders.ts)
> );Pattern pattern = Pattern.begin(
> "start",
> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product1");
> }
> }).times(1).followedBy("middle").where(new SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product2");
> }
> }).oneOrMore().until(new SimpleCondition() {
> @Override
> public boolean filter(orders value) throws Exception {
> return value.product.equals("product3");
> }
> }).within(Time.seconds(10));
> PatternStream patternStream =
> CEP.pattern(stream.keyBy((KeySelector) orders -> 
> orders.user_id), pattern);DataStream alerts = 
> patternStream.select((PatternSelectFunction) matches ->
> matches.get("start").get(0).user_id + "->" +
> matches.get("middle").get(0).ts);
> alerts.print();
> 
> 
> [cid:image001.png@01D7579C.775FCA00]
> 
> I have also attached the checkpoint file.
> 
> It looks like the NFA state keeps track of all keys seen and the start state 
> and that leads to increase in checkpoint size if the keys are not reused in 
> patterns. So, if I have fixed number of keys the size do not increase. is 
> this the expected behavior and correct understanding?
> Is there a way to drop these keys once the pattern is matched.? or am I 
> missing something here?
> 
> Thanks,
> Mohit
>