Thank you all. This discussion is very helpful. It sounds like I can wait for 1.6 though given our development status.
Michael > On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com> wrote: > > Hi all, > > I think I see a way to eagerly do full state migration without writing your > own Operator, but it's kind of hacky and may have flaws I'm not aware of. > > In Flink 1.5 we now have the possibility to connect BroadcastStreams to > KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant > because in the processBroadcastElement() method you can supply a > KeyedStateFunction to the Context.applyToKeyedState() method, and this > KeyedStateFunction will be applied every item of keyed state associated with > the state descriptor you specify. I've been doing some experiments with this, > and it's quite powerful in cases where it's useful to operate on all of your > application's state. > > I believe this was intended for cases where an update to an item of broadcast > state has implications for associated keyed state, but I see nothing that > prevents you from essentially ignoring the broadcast stream and using this > mechanism to implement keyed state migration. > > David > > > > On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > Hi, > > Yes it should be feasible. As I said before, with Flink 1.6 there will be > better way for migrating a state, but for now you either need to lazily > convert the state, or iterate over the keys and do the job manually. > > Piotrek > > >> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com >> <mailto:tony19920...@gmail.com>> wrote: >> >> Hi Piotrek, >> >> So my question is: is that feasible to migrate state from `ProcessFunction` >> to my own operator then use `getKeyedStateBackend()` to migrate the states? >> If yes, is there anything I need to be careful with? If no, why and can it >> be available in the future? Thank you. >> >> Best Regards, >> Tony Wei >> >> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>>: >> Hi, >> >> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the >> function and you can not migrate your state that way. >> >> As far as I know yes, at the moment in order to convert everything at once >> (without getKeyes you still can implement lazy conversion) you would have to >> write your own operator. >> >> Piotrek >> >> >>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com >>> <mailto:tony19920...@gmail.com>> wrote: >>> >>> Hi Piotrek, >>> >>> I used `ProcessFunction` to implement it, but it seems that I can't call >>> `getKeyedStateBackend()` like `WindowOperator` did. >>> I found that `getKeyedStateBackend()` is the method in >>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. >>> Dose that mean I can't look up all keys and migrate the entire previous >>> states to the new states in `ProcessFunction#open()`? >>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to >>> migration state like the manner showed in `WindowOperator`? >>> >>> Best Regards, >>> Tony Wei >>> >>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>>: >>> What function are you implementing and how are you using it? >>> >>> Usually it’s enough if your function implements RichFunction (or rather >>> extend from AbstractRichFunction) and then you could use RichFunction#open >>> in the similar manner as in the code that I posted in previous message. >>> Flink in many places performs instanceof chekcs like: >>> org.apache.flink.api.com >>> <http://org.apache.flink.api.com/>mon.functions.util.FunctionUtils#openFunction >>> >>> public static void openFunction(Function function, Configuration >>> parameters) throws Exception{ >>> if (function instanceof RichFunction) { >>> RichFunction richFunction = (RichFunction) function; >>> richFunction.open(parameters); >>> } >>> } >>> >>> Piotrek >>> >>> >>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com >>>> <mailto:tony19920...@gmail.com>> wrote: >>>> >>>> Hi Piotrek, >>>> >>>> It seems that this was implemented by `Operator` API, which is a more low >>>> level api compared to `Function` API. >>>> Since in `Function` API level we can only migrate state by event >>>> triggered, it is more convenient in this way to migrate state by foreach >>>> all keys in `open()` method. >>>> If I was implemented state operator by `ProcessFunction` API, is it >>>> possible to port it to `KeyedProcessOperator` and do the state migration >>>> that you mentioned? >>>> And are there something concerned and difficulties that will leads to >>>> restored state failed or other problems? Thank you! >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com >>>> <mailto:pi...@data-artisans.com>>: >>>> Hi, >>>> >>>> General solution for state/schema migration is under development and it >>>> might be released with Flink 1.6.0. >>>> >>>> Before that, you need to manually handle the state migration in your >>>> operator’s open method. Lets assume that your OperatorV1 has a state field >>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible >>>> with previous version. What you can do, is to add a logic in open method, >>>> to check: >>>> 1. If “stateV2” is non empty, do nothing >>>> 2. If there is no “stateV2”, iterate over all of the keys and manually >>>> migrate “stateV1” to “stateV2” >>>> >>>> In your OperatorV3 you could drop the support for “stateV1”. >>>> >>>> I have once implemented something like that here: >>>> >>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258 >>>> >>>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258> >>>> >>>> Hope that helps! >>>> >>>> Piotrek >>>> >>>> >>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com >>>>> <mailto:mla...@technomage.com>> wrote: >>>>> >>>>> We are still pretty new to Flink and I have a conceptual / DevOps >>>>> question. >>>>> >>>>> When a job is modified and we want to deploy the new version, what is the >>>>> preferred method? Our jobs have a lot of keyed state. >>>>> >>>>> If we use snapshots we have old state that may no longer apply to the new >>>>> pipeline. >>>>> If we start a new job we can reprocess historical data from Kafka, but >>>>> that can be very resource heavy for a while. >>>>> >>>>> Is there an option I am missing? Are there facilities to “patch” or >>>>> “purge” selectively the keyed state? >>>>> >>>>> Michael >>>> >>>> >>> >>> >> >> > > > > > -- > David Anderson | Training Coordinator > > <https://data-artisans.com/> > > Follow us @dataArtisans <https://twitter.com/dataArtisans> > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time >