Hi Bhaskar,

If you want different TTLs per key, then you should use timers with a process 
function 
as shown in [1]. This is though an old presentation, so now the 
RichProcessFunction is a KeyedProcessFunction.
Also please have a look at the training material in [2] and the process 
function documentation in [3]

Cheers,
Kostas

[1] 
https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
 
<https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction>
[2] https://training.data-artisans.com/ <https://training.data-artisans.com/>
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html>

> On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <bhaskar.eba...@gmail.com> wrote:
> 
> Thanks Hequn. But i want to give random TTL for each partitioned key. How can 
> i achieve it? 
> 
> Regards
> Bhaskar
> 
> On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghe...@gmail.com 
> <mailto:chenghe...@gmail.com>> wrote:
> Hi bhaskar,
> 
> You need change nothing if you want to handle multi keys. Flink will do it 
> for you. The ValueState is a keyed state. You can think of Keyed State 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>
>  as Operator State that has been partitioned, or sharded, with exactly one 
> state-partition per key. 
> TTL can be used in the same way.
> 
> Best, Hequn
> 
> 
> On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com 
> <mailto:bhaskar.eba...@gmail.com> <bhaskar.eba...@gmail.com 
> <mailto:bhaskar.eba...@gmail.com>> wrote:
> Hi
> In the following example given in flink:
> object ExampleCountWindowAverage extends App {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>   env.fromCollection(List(
>     (1L, 3L),
>     (1L, 5L),
>     (1L, 7L),
>     (1L, 4L),
>     (1L, 2L)
>   )).keyBy(_._1)
>     .flatMap(new CountWindowAverage())
>     .print()
>   // the printed output will be (1,4) and (1,5)
> 
>   env.execute("ExampleManagedState")
> }
> 
> There is only 1 state because there is one key. In the CountWindowAverage 
> method there is one state descriptor :  new ValueStateDescriptor[(Long, 
> Long)]("average", createTypeInformation[(Long, Long)])
> Name given as "average". In order to implement this is generic way, shall i 
> modify the  method:
> 
> CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, 
> Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to 
> configure TTL for this? Inside this method?
> In the eample: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl>
>  ,   you have given a stand alone ValueStateDescriptor.  How can i use the 
> TTL inside CountWindowAverage() per Key?
> 
> Regards
> Bhaskar

Reply via email to