Hi Bhaskar, If you want different TTLs per key, then you should use timers with a process function as shown in [1]. This is though an old presentation, so now the RichProcessFunction is a KeyedProcessFunction. Also please have a look at the training material in [2] and the process function documentation in [3]
Cheers, Kostas [1] https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction <https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction> [2] https://training.data-artisans.com/ <https://training.data-artisans.com/> [3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html> > On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <bhaskar.eba...@gmail.com> wrote: > > Thanks Hequn. But i want to give random TTL for each partitioned key. How can > i achieve it? > > Regards > Bhaskar > > On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghe...@gmail.com > <mailto:chenghe...@gmail.com>> wrote: > Hi bhaskar, > > You need change nothing if you want to handle multi keys. Flink will do it > for you. The ValueState is a keyed state. You can think of Keyed State > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state> > as Operator State that has been partitioned, or sharded, with exactly one > state-partition per key. > TTL can be used in the same way. > > Best, Hequn > > > On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com > <mailto:bhaskar.eba...@gmail.com> <bhaskar.eba...@gmail.com > <mailto:bhaskar.eba...@gmail.com>> wrote: > Hi > In the following example given in flink: > object ExampleCountWindowAverage extends App { > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.fromCollection(List( > (1L, 3L), > (1L, 5L), > (1L, 7L), > (1L, 4L), > (1L, 2L) > )).keyBy(_._1) > .flatMap(new CountWindowAverage()) > .print() > // the printed output will be (1,4) and (1,5) > > env.execute("ExampleManagedState") > } > > There is only 1 state because there is one key. In the CountWindowAverage > method there is one state descriptor : new ValueStateDescriptor[(Long, > Long)]("average", createTypeInformation[(Long, Long)]) > Name given as "average". In order to implement this is generic way, shall i > modify the method: > > CountWindowAverage(keyName:String) so that new ValueStateDescriptor[(Long, > Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to > configure TTL for this? Inside this method? > In the eample: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl> > , you have given a stand alone ValueStateDescriptor. How can i use the > TTL inside CountWindowAverage() per Key? > > Regards > Bhaskar