Hi, Yes it should be feasible. As I said before, with Flink 1.6 there will be better way for migrating a state, but for now you either need to lazily convert the state, or iterate over the keys and do the job manually.
Piotrek > On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com> wrote: > > Hi Piotrek, > > So my question is: is that feasible to migrate state from `ProcessFunction` > to my own operator then use `getKeyedStateBackend()` to migrate the states? > If yes, is there anything I need to be careful with? If no, why and can it be > available in the future? Thank you. > > Best Regards, > Tony Wei > > 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>>: > Hi, > > Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the > function and you can not migrate your state that way. > > As far as I know yes, at the moment in order to convert everything at once > (without getKeyes you still can implement lazy conversion) you would have to > write your own operator. > > Piotrek > > >> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com >> <mailto:tony19920...@gmail.com>> wrote: >> >> Hi Piotrek, >> >> I used `ProcessFunction` to implement it, but it seems that I can't call >> `getKeyedStateBackend()` like `WindowOperator` did. >> I found that `getKeyedStateBackend()` is the method in >> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. >> Dose that mean I can't look up all keys and migrate the entire previous >> states to the new states in `ProcessFunction#open()`? >> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to >> migration state like the manner showed in `WindowOperator`? >> >> Best Regards, >> Tony Wei >> >> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>>: >> What function are you implementing and how are you using it? >> >> Usually it’s enough if your function implements RichFunction (or rather >> extend from AbstractRichFunction) and then you could use RichFunction#open >> in the similar manner as in the code that I posted in previous message. >> Flink in many places performs instanceof chekcs like: >> org.apache.flink.api.com >> <http://org.apache.flink.api.com/>mon.functions.util.FunctionUtils#openFunction >> >> public static void openFunction(Function function, Configuration parameters) >> throws Exception{ >> if (function instanceof RichFunction) { >> RichFunction richFunction = (RichFunction) function; >> richFunction.open(parameters); >> } >> } >> >> Piotrek >> >> >>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com >>> <mailto:tony19920...@gmail.com>> wrote: >>> >>> Hi Piotrek, >>> >>> It seems that this was implemented by `Operator` API, which is a more low >>> level api compared to `Function` API. >>> Since in `Function` API level we can only migrate state by event triggered, >>> it is more convenient in this way to migrate state by foreach all keys in >>> `open()` method. >>> If I was implemented state operator by `ProcessFunction` API, is it >>> possible to port it to `KeyedProcessOperator` and do the state migration >>> that you mentioned? >>> And are there something concerned and difficulties that will leads to >>> restored state failed or other problems? Thank you! >>> >>> Best Regards, >>> Tony Wei >>> >>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>>: >>> Hi, >>> >>> General solution for state/schema migration is under development and it >>> might be released with Flink 1.6.0. >>> >>> Before that, you need to manually handle the state migration in your >>> operator’s open method. Lets assume that your OperatorV1 has a state field >>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible >>> with previous version. What you can do, is to add a logic in open method, >>> to check: >>> 1. If “stateV2” is non empty, do nothing >>> 2. If there is no “stateV2”, iterate over all of the keys and manually >>> migrate “stateV1” to “stateV2” >>> >>> In your OperatorV3 you could drop the support for “stateV1”. >>> >>> I have once implemented something like that here: >>> >>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258 >>> >>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258> >>> >>> Hope that helps! >>> >>> Piotrek >>> >>> >>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com >>>> <mailto:mla...@technomage.com>> wrote: >>>> >>>> We are still pretty new to Flink and I have a conceptual / DevOps question. >>>> >>>> When a job is modified and we want to deploy the new version, what is the >>>> preferred method? Our jobs have a lot of keyed state. >>>> >>>> If we use snapshots we have old state that may no longer apply to the new >>>> pipeline. >>>> If we start a new job we can reprocess historical data from Kafka, but >>>> that can be very resource heavy for a while. >>>> >>>> Is there an option I am missing? Are there facilities to “patch” or >>>> “purge” selectively the keyed state? >>>> >>>> Michael >>> >>> >> >> > >