Hi Piotrek,

So my question is: is that feasible to migrate state from `ProcessFunction`
to my own operator then use `getKeyedStateBackend()` to migrate the states?
If yes, is there anything I need to be careful with? If no, why and can it
be available in the future? Thank you.

Best Regards,
Tony Wei

2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
> function and you can not migrate your state that way.
>
> As far as I know yes, at the moment in order to convert everything at once
> (without getKeyes you still can implement lazy conversion) you would have
> to write your own operator.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote:
>
> Hi Piotrek,
>
> I used `ProcessFunction` to implement it, but it seems that I can't call
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
> to migration state like the manner showed in `WindowOperator`?
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> What function are you implementing and how are you using it?
>>
>> Usually it’s enough if your function implements RichFunction (or rather
>> extend from AbstractRichFunction) and then you could use RichFunction#open
>> in the similar manner as in the code that I posted in previous message.
>> Flink in many places performs instanceof chekcs like:
>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>
>> public static void openFunction(Function fun
>> ction, Configuration parameters) throws Exception{
>>    if (function instanceof RichFunction) {
>>       RichFunction richFunction = (RichFunction) function;
>>       richFunction.open(parameters);
>>    }
>> }
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote:
>>
>> Hi Piotrek,
>>
>> It seems that this was implemented by `Operator` API, which is a more low
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event
>> triggered, it is more convenient in this way to migrate state by foreach
>> all keys in `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it
>> possible to port it to `KeyedProcessOperator` and do the state migration
>> that you mentioned?
>> And are there something concerned and difficulties that will leads to
>> restored state failed or other problems? Thank you!
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>
>>> Hi,
>>>
>>> General solution for state/schema migration is under development and it
>>> might be released with Flink 1.6.0.
>>>
>>> Before that, you need to manually handle the state migration in your
>>> operator’s open method. Lets assume that your OperatorV1 has a state field
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>> with previous version. What you can do, is to add a logic in open method,
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>>> migrate “stateV1” to “stateV2”
>>>
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>
>>> I have once implemented something like that here:
>>>
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>> rator.java#L258
>>>
>>> Hope that helps!
>>>
>>> Piotrek
>>>
>>>
>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote:
>>>
>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>> question.
>>>
>>> When a job is modified and we want to deploy the new version, what is
>>> the preferred method?  Our jobs have a lot of keyed state.
>>>
>>> If we use snapshots we have old state that may no longer apply to the
>>> new pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but
>>> that can be very resource heavy for a while.
>>>
>>> Is there an option I am missing?  Are there facilities to “patch” or
>>> “purge” selectively the keyed state?
>>>
>>> Michael
>>>
>>>
>>>
>>
>>
>
>

Reply via email to