Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-13 Thread Ori Popowski
Hi, Eventually flatMapWithState solved the problem. I started by looking into KeyedProcessFunction which lead me to flatMapWithState. It's working very well. .keyBy(…) .flatMapWithState[Event, Int] { (event, countOpt) => val count = countOpt.getOrElse(0) if (count < config.limit) (List(event)

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-12 Thread Ori Popowski
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? I'll see if I can introduce it to the code. > if you do, the ProcessWindowFunction is getting as a

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Rafi Aroch
Hi Ori, In your code, are you using the process() API? .process(new MyProcessWindowFunction()); if you do, the ProcessWindowFunction is getting as argument an Iterable with ALL elements collected along the session. This will make the state per key potentially huge (like you're experiencing). As

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Congxian Qiu
Hi Ori AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process