I think now it’s not easily possible, however it might be a valid suggestion to 
add `OnTimerContext#getCurrentKey()` method. 

Besides using ValueState as you discussed before, as a some kind of a walk 
around you could copy and modify KeyedProcessOperator to suits your needs, but 
this would be more complicated.

Piotrek

> On 4 Feb 2018, at 20:36, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Jürgen,
> 
> That makes sense to me.
> 
> Anyone from the Flink team want to comment on (a) if there is a way to get 
> the current key in the timer callback without using an explicit ValueState 
> that’s maintained in the processElement() method, and (b) if not, whether 
> that could be added to the context?
> 
> Thanks,
> 
> — Ken
> 
> 
>> On Feb 4, 2018, at 6:14 AM, Jürgen Thomann <juergen.thom...@innogames.com 
>> <mailto:juergen.thom...@innogames.com>> wrote:
>> 
>> Hi Ken,
>> 
>> thanks for your answer. You're right and I'm doing it already that way. I 
>> just hoped that I could avoid the ValueState (I'm using a MapState as well 
>> already, which does not store the key) and get the key from the provided 
>> Context of the ProcessFunction. This would avoid having the ValueState and 
>> setting it in the processElement just to know the key in the onTimer 
>> function. 
>> In the current way I have to check the ValueState for every element if the 
>> key is already set or just set it every time again the processElement method 
>> is invoked.
>> 
>> Best,
>> Jürgen
>> 
>> On 02.02.2018 18:37, Ken Krugler wrote:
>>> Hi Jürgen,
>>> 
>>>> On Feb 2, 2018, at 6:24 AM, Jürgen Thomann <juergen.thom...@innogames.com 
>>>> <mailto:juergen.thom...@innogames.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I'm currently using a ProcessFunction after a keyBy() and can't find a way 
>>>> to get the key.
>>> 
>>> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>>> 
>>> So then that same field contains the key in the 
>>> ProcessFunction.processElement(in, …) parameter, yes?
>>> 
>>>> I'm currently storing it in a ValueState<String> within processElement
>>> 
>>> If you’re using a ValueState, then there’s one of those for each unique 
>>> key, not one for the operation.
>>> 
>>> I.e. the ValueState for key = “one” is separate from the ValueState for key 
>>> = “two”.
>>> 
>>> You typically store the key in the state so it’s accessible in the onTimer 
>>> method.
>>> 
>>>> and set it all the time, so that I can access it in onTimer(). Is there a 
>>>> better way to get the key? We are using Flink 1.3 at the moment.
>>> 
>>> The ValueState (what you used in processElement) that you’re accessing in 
>>> the onTimer() method is also scoped by the current key.
>>> 
>>> So assuming you stored the key in the state inside of your processElement() 
>>> call, then you should have everything you need.
>>> 
>>> — Ken
>>> 
>>> PS - Check out 
>>> https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
>>>  
>>> <https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr

Reply via email to