Hi Biplob,

What do you mean by "creating a sliding window on top of a state"?
Sliding windows are typically defined on streams (data in motion) and not
on state (data at rest).

It seems that UpdatedTxnState always holds the last record that was
received per key. Do you want to compute the windows always on the last
records you received per key?
This would mean that you need to retract the overwritten values, i.e.,
remove them from the result, and add the new values to the result.
This can be implemented but requires a good design. You cannot use the
built-in sliding windows of the DataStream because they do not support
retraction.
I think you have to implement this functionality yourself.

Best, Fabian





2017-07-28 15:34 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>:

> Hi,
>
> We recently moved from Spark Streaming to Flink for our stream processing
> requirements in our organization and we are in the process of removing the
> number of external calls as much as possible. Earlier we were using HBASE
> to
> store the incoming data, but we now want to try out stateful operations on
> top of Flink.
>
> In that aspect, we have fixed that we need to have a sliding window of size
> 180 days with a slide Interval of 1 day each such that we keep a state of
> 180 days at any given time. This state would at max be around 40-50 GB for
> the 180 days so we thought of using RocksDB for state storage.
>
> Now the flow of job we are thinking would be incoming events and some extra
> information:
>
> events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
> UpdatedTxnState());
>
> where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
> looks something like this :
>
>
> public class UpdatedTxnState extends RichFlatMapFunction<Tuple3&lt;String,
> List&lt;String>, EventType>, Tuple2<String, EventType>> {
>
>   private ValueState<Tuple3&lt;EventType, List&lt;String>, String>>
> txnState;
>
>   @Override
>   public void open(Configuration config) throws Exception {
>     // Reducing state that keeps a sum
>     ValueStateDescriptor<Tuple3&lt;EventType, List&lt;String>, String>>
> stateDescriptor = new ValueStateDescriptor<>(
>             "transaction", TypeInformation.of(new
> TypeHint<Tuple3&lt;EventType, List&lt;String>, String>>() {
>     }));
>
>     stateDescriptor.setQueryable("transaction");
>
>     this.txnState = getRuntimeContext().getState(stateDescriptor);
>   }
>
>   @Override
>   public void flatMap(Tuple3<String, List&lt;String>, EventType> input,
>                       Collector<Tuple2&lt;String, EventType>> output)
> throws
> Exception {
>
>
>     txnState.update(new Tuple3<>(input._3(),input._2(),input._1());
>
>     output.collect(new Tuple2<>(input._1(),input._3()));
>   }
> }
>
>
> now, I have a couple of questions :
> 1. how can I create a sliding window on top of this state? I can think of
> doing a keyby on the output of flatmap but for me doesn't really make much
> sense and I didn't really find a way to build a state after windowing.
> 2. Can I query the state with the state name I defined here "transaction"
> anywhere in my job?
>
> Thanks,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to