Hi, We recently moved from Spark Streaming to Flink for our stream processing requirements in our organization and we are in the process of removing the number of external calls as much as possible. Earlier we were using HBASE to store the incoming data, but we now want to try out stateful operations on top of Flink.
In that aspect, we have fixed that we need to have a sliding window of size 180 days with a slide Interval of 1 day each such that we keep a state of 180 days at any given time. This state would at max be around 40-50 GB for the 180 days so we thought of using RocksDB for state storage. Now the flow of job we are thinking would be incoming events and some extra information: events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new UpdatedTxnState()); where UpdatedTxnState() is an extension of RichFlatMapFunction class and it looks something like this : public class UpdatedTxnState extends RichFlatMapFunction<Tuple3<String, List<String>, EventType>, Tuple2<String, EventType>> { private ValueState<Tuple3<EventType, List<String>, String>> txnState; @Override public void open(Configuration config) throws Exception { // Reducing state that keeps a sum ValueStateDescriptor<Tuple3<EventType, List<String>, String>> stateDescriptor = new ValueStateDescriptor<>( "transaction", TypeInformation.of(new TypeHint<Tuple3<EventType, List<String>, String>>() { })); stateDescriptor.setQueryable("transaction"); this.txnState = getRuntimeContext().getState(stateDescriptor); } @Override public void flatMap(Tuple3<String, List<String>, EventType> input, Collector<Tuple2<String, EventType>> output) throws Exception { txnState.update(new Tuple3<>(input._3(),input._2(),input._1()); output.collect(new Tuple2<>(input._1(),input._3())); } } now, I have a couple of questions : 1. how can I create a sliding window on top of this state? I can think of doing a keyby on the output of flatmap but for me doesn't really make much sense and I didn't really find a way to build a state after windowing. 2. Can I query the state with the state name I defined here "transaction" anywhere in my job? Thanks, Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.