Hello,
Thank you for your answer and apologies for the late response.
For timers we are using :
state.backend.rocksdb.timer-service.factory: rocksdb
Are we still affected by [1] ?
For the interruptibility, we have coalesced our timers and the application
became more responsive to stop signals.
Also, after long investigations, we've found that we were abusing/misusing
AggregatingState & RocksDB :-/
The pseudo code of our window function looked like the following :
/ ........
private AggregatingState<*IN*,OUT> state;
.......
@Override
public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
Iterator<IN> it= input.iterator();
while (it.hasNext()){
*state.add(it.next());*
}
out.collect(state.get());
}
......../
Doing so, leads to call getInternal/updateInternal in state.add on RocksDB
for each inputItem and causes a huge pressure on RocksDB.
We transformed the code in order to iterate over items in the
AggregagtingFunction instead and call the state.add only once:
/ ........
private AggregatingState<*Iterable<IN>*,OUT> state;
.......
@Override
public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
* state.add(input);*
out.collect(state.get());
}
........
/
Is this the right way to do so ?
Since this modifications, the application is more stable and the recover
time falls to few minutes.
Thank you for your help.
Best regards,
Amine
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/