Thank you all.  This discussion is very helpful.  It sounds like I can wait for 
1.6 though given our development status.

Michael

> On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com> wrote:
> 
> Hi all,
> 
> I think I see a way to eagerly do full state migration without writing your 
> own Operator, but it's kind of hacky and may have flaws I'm not aware of. 
> 
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to 
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant 
> because in the processBroadcastElement() method you can supply a 
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction will be applied every item of keyed state associated with 
> the state descriptor you specify. I've been doing some experiments with this, 
> and it's quite powerful in cases where it's useful to operate on all of your 
> application's state.
> 
> I believe this was intended for cases where an update to an item of broadcast 
> state has implications for associated keyed state, but I see nothing that 
> prevents you from essentially ignoring the broadcast stream and using this 
> mechanism to implement keyed state migration.
> 
> David
> 
> 
> 
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Yes it should be feasible. As I said before, with Flink 1.6 there will be 
> better way for migrating a state, but for now you either need to lazily 
> convert the state, or iterate over the keys and do the job manually.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> So my question is: is that feasible to migrate state from `ProcessFunction` 
>> to my own operator then use `getKeyedStateBackend()` to migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can it 
>> be available in the future? Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>>:
>> Hi,
>> 
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
>> function and you can not migrate your state that way.
>> 
>> As far as I know yes, at the moment in order to convert everything at once 
>> (without getKeyes you still can implement lazy conversion) you would have to 
>> write your own operator.
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com 
>>> <mailto:tony19920...@gmail.com>> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in 
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous 
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>>> migration state like the manner showed in `WindowOperator`? 
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>>:
>>> What function are you implementing and how are you using it?
>>> 
>>> Usually it’s enough if your function implements RichFunction (or rather 
>>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>>> in the similar manner as in the code that I posted in previous message. 
>>> Flink in many places performs instanceof chekcs like: 
>>> org.apache.flink.api.com 
>>> <http://org.apache.flink.api.com/>mon.functions.util.FunctionUtils#openFunction
>>> 
>>> public static void openFunction(Function function, Configuration 
>>> parameters) throws Exception{
>>>    if (function instanceof RichFunction) {
>>>       RichFunction richFunction = (RichFunction) function;
>>>       richFunction.open(parameters);
>>>    }
>>> }
>>> 
>>> Piotrek
>>> 
>>> 
>>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com 
>>>> <mailto:tony19920...@gmail.com>> wrote:
>>>> 
>>>> Hi Piotrek,
>>>> 
>>>> It seems that this was implemented by `Operator` API, which is a more low 
>>>> level api compared to `Function` API.
>>>> Since in `Function` API level we can only migrate state by event 
>>>> triggered, it is more convenient in this way to migrate state by foreach 
>>>> all keys in `open()` method.
>>>> If I was implemented state operator by `ProcessFunction` API, is it 
>>>> possible to port it to `KeyedProcessOperator` and do the state migration 
>>>> that you mentioned?
>>>> And are there something concerned and difficulties that will leads to 
>>>> restored state failed or other problems? Thank you!
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
>>>> <mailto:pi...@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> General solution for state/schema migration is under development and it 
>>>> might be released with Flink 1.6.0.
>>>> 
>>>> Before that, you need to manually handle the state migration in your 
>>>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>>>> with previous version. What you can do, is to add a logic in open method, 
>>>> to check:
>>>> 1. If “stateV2” is non empty, do nothing
>>>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>>>> migrate “stateV1” to “stateV2”
>>>> 
>>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>> 
>>>> I have once implemented something like that here:
>>>> 
>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>>>  
>>>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>>>> 
>>>> Hope that helps!
>>>> 
>>>> Piotrek
>>>> 
>>>> 
>>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com 
>>>>> <mailto:mla...@technomage.com>> wrote:
>>>>> 
>>>>> We are still pretty new to Flink and I have a conceptual / DevOps 
>>>>> question.
>>>>> 
>>>>> When a job is modified and we want to deploy the new version, what is the 
>>>>> preferred method?  Our jobs have a lot of keyed state.
>>>>> 
>>>>> If we use snapshots we have old state that may no longer apply to the new 
>>>>> pipeline.
>>>>> If we start a new job we can reprocess historical data from Kafka, but 
>>>>> that can be very resource heavy for a while.
>>>>> 
>>>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>>>> “purge” selectively the keyed state?
>>>>> 
>>>>> Michael
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> 
> -- 
> David Anderson | Training Coordinator
> 
>  <https://data-artisans.com/>
> 
> Follow us @dataArtisans <https://twitter.com/dataArtisans>
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> 

Reply via email to