Tony,

You are right; I hadn't thought this through. The KeyedStateFunction only
has access to one piece of state at a time, which is the state for some
specific key associated with one specific state descriptor. You can fetch,
update, or clear that piece of state, but as far as I know, you can't
operate with two state descriptors at the same time, so there doesn't
appear to be any way to effect a migration.

Sorry for that,
David

On Tue, Jun 12, 2018 at 9:56 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi David,
>
> I have read the document for `Context.applyToKeyedState()`, but I still
> have some questions for using it to implement keyed state migration.
> `Context.applyToKeyedState()` can only be called in
> `processBoradcaseElement()`, so it won't have any key information.
> It looks like I can use `KeyedStateFunction` to get, update or clear my
> keyed states. Am I right?
> If I want to migrate to different type, e.g. change `string` type to `int`
> type, how do I archive by using this functionality?
> It seems that I can't use `key` parameter in `KeyedStateFunction` to
> access the other state, generated by another state descriptor.
> Please correct me if I misunderstood. Thank you.
>
> Best Regards,
> Tony Wei
>
>
>
> 2018-06-09 9:45 GMT+08:00 TechnoMage <mla...@technomage.com>:
>
>> Thank you all.  This discussion is very helpful.  It sounds like I can
>> wait for 1.6 though given our development status.
>>
>> Michael
>>
>>
>> On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com>
>> wrote:
>>
>> Hi all,
>>
>> I think I see a way to eagerly do full state migration without writing
>> your own Operator, but it's kind of hacky and may have flaws I'm not aware
>> of.
>>
>> In Flink 1.5 we now have the possibility to connect BroadcastStreams to
>> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
>> because in the processBroadcastElement() method you can supply a
>> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
>> KeyedStateFunction
>> will be applied every item of keyed state associated with the state
>> descriptor you specify. I've been doing some experiments with this, and
>> it's quite powerful in cases where it's useful to operate on all of your
>> application's state.
>>
>> I believe this was intended for cases where an update to an item of
>> broadcast state has implications for associated keyed state, but I see
>> nothing that prevents you from essentially ignoring the broadcast stream
>> and using this mechanism to implement keyed state migration.
>>
>> David
>>
>>
>>
>> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Yes it should be feasible. As I said before, with Flink 1.6 there will
>>> be better way for migrating a state, but for now you either need to lazily
>>> convert the state, or iterate over the keys and do the job manually.
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com> wrote:
>>>
>>> Hi Piotrek,
>>>
>>> So my question is: is that feasible to migrate state from
>>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
>>> migrate the states?
>>> If yes, is there anything I need to be careful with? If no, why and can
>>> it be available in the future? Thank you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>>>> function and you can not migrate your state that way.
>>>>
>>>> As far as I know yes, at the moment in order to convert everything at
>>>> once (without getKeyes you still can implement lazy conversion) you would
>>>> have to write your own operator.
>>>>
>>>> Piotrek
>>>>
>>>>
>>>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> I used `ProcessFunction` to implement it, but it seems that I can't
>>>> call `getKeyedStateBackend()` like `WindowOperator` did.
>>>> I found that `getKeyedStateBackend()` is the method in
>>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>>> Dose that mean I can't look up all keys and migrate the entire previous
>>>> states to the new states in `ProcessFunction#open()`?
>>>> As I said, do I need to port `ProcessFunction` to
>>>> `KeyedProcessOperator` to migration state like the manner showed in
>>>> `WindowOperator`?
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>>>
>>>>> What function are you implementing and how are you using it?
>>>>>
>>>>> Usually it’s enough if your function implements RichFunction (or
>>>>> rather extend from AbstractRichFunction) and then you could use
>>>>> RichFunction#open in the similar manner as in the code that I posted in
>>>>> previous message. Flink in many places performs instanceof chekcs like:
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>>>>
>>>>> public static void openFunction(Function fun
>>>>> ction, Configuration parameters) throws Exception{
>>>>>    if (function instanceof RichFunction) {
>>>>>       RichFunction richFunction = (RichFunction) function;
>>>>>       richFunction.open(parameters);
>>>>>    }
>>>>> }
>>>>>
>>>>> Piotrek
>>>>>
>>>>>
>>>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote:
>>>>>
>>>>> Hi Piotrek,
>>>>>
>>>>> It seems that this was implemented by `Operator` API, which is a more
>>>>> low level api compared to `Function` API.
>>>>> Since in `Function` API level we can only migrate state by event
>>>>> triggered, it is more convenient in this way to migrate state by foreach
>>>>> all keys in `open()` method.
>>>>> If I was implemented state operator by `ProcessFunction` API, is it
>>>>> possible to port it to `KeyedProcessOperator` and do the state migration
>>>>> that you mentioned?
>>>>> And are there something concerned and difficulties that will leads to
>>>>> restored state failed or other problems? Thank you!
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> General solution for state/schema migration is under development and
>>>>>> it might be released with Flink 1.6.0.
>>>>>>
>>>>>> Before that, you need to manually handle the state migration in your
>>>>>> operator’s open method. Lets assume that your OperatorV1 has a state 
>>>>>> field
>>>>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>>>>> with previous version. What you can do, is to add a logic in open method,
>>>>>> to check:
>>>>>> 1. If “stateV2” is non empty, do nothing
>>>>>> 2. If there is no “stateV2”, iterate over all of the keys and
>>>>>> manually migrate “stateV1” to “stateV2”
>>>>>>
>>>>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>>>>
>>>>>> I have once implemented something like that here:
>>>>>>
>>>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>>>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>>>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>>>>> rator.java#L258
>>>>>>
>>>>>> Hope that helps!
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>>
>>>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote:
>>>>>>
>>>>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>>>>> question.
>>>>>>
>>>>>> When a job is modified and we want to deploy the new version, what is
>>>>>> the preferred method?  Our jobs have a lot of keyed state.
>>>>>>
>>>>>> If we use snapshots we have old state that may no longer apply to the
>>>>>> new pipeline.
>>>>>> If we start a new job we can reprocess historical data from Kafka,
>>>>>> but that can be very resource heavy for a while.
>>>>>>
>>>>>> Is there an option I am missing?  Are there facilities to “patch” or
>>>>>> “purge” selectively the keyed state?
>>>>>>
>>>>>> Michael
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> David Anderson | Training Coordinator
>>
>> <https://data-artisans.com/>
>>
>> Follow us @dataArtisans <https://twitter.com/dataArtisans>
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>>
>>
>>
>


-- 
*David Anderson* | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time

Reply via email to