Tony, You are right; I hadn't thought this through. The KeyedStateFunction only has access to one piece of state at a time, which is the state for some specific key associated with one specific state descriptor. You can fetch, update, or clear that piece of state, but as far as I know, you can't operate with two state descriptors at the same time, so there doesn't appear to be any way to effect a migration.
Sorry for that, David On Tue, Jun 12, 2018 at 9:56 AM, Tony Wei <tony19920...@gmail.com> wrote: > Hi David, > > I have read the document for `Context.applyToKeyedState()`, but I still > have some questions for using it to implement keyed state migration. > `Context.applyToKeyedState()` can only be called in > `processBoradcaseElement()`, so it won't have any key information. > It looks like I can use `KeyedStateFunction` to get, update or clear my > keyed states. Am I right? > If I want to migrate to different type, e.g. change `string` type to `int` > type, how do I archive by using this functionality? > It seems that I can't use `key` parameter in `KeyedStateFunction` to > access the other state, generated by another state descriptor. > Please correct me if I misunderstood. Thank you. > > Best Regards, > Tony Wei > > > > 2018-06-09 9:45 GMT+08:00 TechnoMage <mla...@technomage.com>: > >> Thank you all. This discussion is very helpful. It sounds like I can >> wait for 1.6 though given our development status. >> >> Michael >> >> >> On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com> >> wrote: >> >> Hi all, >> >> I think I see a way to eagerly do full state migration without writing >> your own Operator, but it's kind of hacky and may have flaws I'm not aware >> of. >> >> In Flink 1.5 we now have the possibility to connect BroadcastStreams to >> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant >> because in the processBroadcastElement() method you can supply a >> KeyedStateFunction to the Context.applyToKeyedState() method, and this >> KeyedStateFunction >> will be applied every item of keyed state associated with the state >> descriptor you specify. I've been doing some experiments with this, and >> it's quite powerful in cases where it's useful to operate on all of your >> application's state. >> >> I believe this was intended for cases where an update to an item of >> broadcast state has implications for associated keyed state, but I see >> nothing that prevents you from essentially ignoring the broadcast stream >> and using this mechanism to implement keyed state migration. >> >> David >> >> >> >> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> Yes it should be feasible. As I said before, with Flink 1.6 there will >>> be better way for migrating a state, but for now you either need to lazily >>> convert the state, or iterate over the keys and do the job manually. >>> >>> Piotrek >>> >>> >>> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com> wrote: >>> >>> Hi Piotrek, >>> >>> So my question is: is that feasible to migrate state from >>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to >>> migrate the states? >>> If yes, is there anything I need to be careful with? If no, why and can >>> it be available in the future? Thank you. >>> >>> Best Regards, >>> Tony Wei >>> >>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >>> >>>> Hi, >>>> >>>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the >>>> function and you can not migrate your state that way. >>>> >>>> As far as I know yes, at the moment in order to convert everything at >>>> once (without getKeyes you still can implement lazy conversion) you would >>>> have to write your own operator. >>>> >>>> Piotrek >>>> >>>> >>>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote: >>>> >>>> Hi Piotrek, >>>> >>>> I used `ProcessFunction` to implement it, but it seems that I can't >>>> call `getKeyedStateBackend()` like `WindowOperator` did. >>>> I found that `getKeyedStateBackend()` is the method in >>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. >>>> Dose that mean I can't look up all keys and migrate the entire previous >>>> states to the new states in `ProcessFunction#open()`? >>>> As I said, do I need to port `ProcessFunction` to >>>> `KeyedProcessOperator` to migration state like the manner showed in >>>> `WindowOperator`? >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >>>> >>>>> What function are you implementing and how are you using it? >>>>> >>>>> Usually it’s enough if your function implements RichFunction (or >>>>> rather extend from AbstractRichFunction) and then you could use >>>>> RichFunction#open in the similar manner as in the code that I posted in >>>>> previous message. Flink in many places performs instanceof chekcs like: >>>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction >>>>> >>>>> public static void openFunction(Function fun >>>>> ction, Configuration parameters) throws Exception{ >>>>> if (function instanceof RichFunction) { >>>>> RichFunction richFunction = (RichFunction) function; >>>>> richFunction.open(parameters); >>>>> } >>>>> } >>>>> >>>>> Piotrek >>>>> >>>>> >>>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote: >>>>> >>>>> Hi Piotrek, >>>>> >>>>> It seems that this was implemented by `Operator` API, which is a more >>>>> low level api compared to `Function` API. >>>>> Since in `Function` API level we can only migrate state by event >>>>> triggered, it is more convenient in this way to migrate state by foreach >>>>> all keys in `open()` method. >>>>> If I was implemented state operator by `ProcessFunction` API, is it >>>>> possible to port it to `KeyedProcessOperator` and do the state migration >>>>> that you mentioned? >>>>> And are there something concerned and difficulties that will leads to >>>>> restored state failed or other problems? Thank you! >>>>> >>>>> Best Regards, >>>>> Tony Wei >>>>> >>>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> General solution for state/schema migration is under development and >>>>>> it might be released with Flink 1.6.0. >>>>>> >>>>>> Before that, you need to manually handle the state migration in your >>>>>> operator’s open method. Lets assume that your OperatorV1 has a state >>>>>> field >>>>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible >>>>>> with previous version. What you can do, is to add a logic in open method, >>>>>> to check: >>>>>> 1. If “stateV2” is non empty, do nothing >>>>>> 2. If there is no “stateV2”, iterate over all of the keys and >>>>>> manually migrate “stateV1” to “stateV2” >>>>>> >>>>>> In your OperatorV3 you could drop the support for “stateV1”. >>>>>> >>>>>> I have once implemented something like that here: >>>>>> >>>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7 >>>>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/ >>>>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe >>>>>> rator.java#L258 >>>>>> >>>>>> Hope that helps! >>>>>> >>>>>> Piotrek >>>>>> >>>>>> >>>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote: >>>>>> >>>>>> We are still pretty new to Flink and I have a conceptual / DevOps >>>>>> question. >>>>>> >>>>>> When a job is modified and we want to deploy the new version, what is >>>>>> the preferred method? Our jobs have a lot of keyed state. >>>>>> >>>>>> If we use snapshots we have old state that may no longer apply to the >>>>>> new pipeline. >>>>>> If we start a new job we can reprocess historical data from Kafka, >>>>>> but that can be very resource heavy for a while. >>>>>> >>>>>> Is there an option I am missing? Are there facilities to “patch” or >>>>>> “purge” selectively the keyed state? >>>>>> >>>>>> Michael >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> >> -- >> David Anderson | Training Coordinator >> >> <https://data-artisans.com/> >> >> Follow us @dataArtisans <https://twitter.com/dataArtisans> >> -- >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> Stream Processing | Event Driven | Real Time >> >> >> > -- *David Anderson* | Training Coordinator | data Artisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time