Thanks, I'll check it out. On Fri, Oct 21, 2022, 18:20 Piotr Nowojski <pnowoj...@apache.org> wrote:
> Hi, > > Yes and no. StateProcessor API can read any Flink state, but you have to > describe the state you want it to access. Take a look at the example in the > docs [1]. > > First you have an example of a theoretical production function > `StatefulFunctionWithTime`, which state you want to modify. Note the > `ValueState` and `ListState` fields and their descriptors. That's the state > of that particular function. Descriptors determine how the state is > serialised. Usually they are pretty simple. > Below is the `ReaderFunction`, that you want to use to access/modify the > state via the StateProcessor API. To do so, you have to specify the state > you want to access and effectively mimic/copy paste the state descriptors > from the production code. > > If you want to modify the state of a source/sink function, you would have > to first take a look into the source code of such a connector to know what > to modify and copy its descriptors. Also note that for source/sink the > state is most likely non-keyed. > > Best, > Piotrek > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state > > pt., 21 paź 2022 o 14:37 Sriram Ganesh <srigns...@gmail.com> napisał(a): > >> I have question on this. Different connector can have different >> serialisation and de-serlisation technique right?. Wouldn't that impact?. >> If I use StateProcessor API, would that be agnostic to all the sources and >> sinks?. >> >> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pnowoj...@apache.org> wrote: >> >>> ops >>> >>> > Alternatively, you can modify a code of your function/operator for >>> which you want to modify the state. For example in the >>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >>> method you could add some code that would do a migration of your old state >>> to a new one. >>> > And you can drop such code later, in the next savepoint. >>> >>> That was not entirely true. This would work for the non-keyed state. For >>> the keyed state there is no easy alternative (you would have to iterate >>> through all of the keys, which I think is not exposed via Public API) - >>> best to use StateProcessor API. >>> >>> Best, >>> Piotrek >>> >>> pt., 21 paź 2022 o 10:54 Sriram Ganesh <srigns...@gmail.com> napisał(a): >>> >>>> Thanks !. Will try this. >>>> >>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pnowoj...@apache.org> >>>> wrote: >>>> >>>>> Hi Sriram, >>>>> >>>>> You can read and modify savepoints using StateProcessor API [1]. >>>>> >>>>> Alternatively, you can modify a code of your function/operator for >>>>> which you want to modify the state. For example in the >>>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >>>>> method you could add some code that would do a migration of your old state >>>>> to a new one. >>>>> >>>>> ``` >>>>> private transient ValueState<Foo> old; >>>>> private transient ValueState<Foo> new; >>>>> (...) >>>>> initializeState(...) { >>>>> (...) >>>>> if (new.value() == null && old.value() != null) { >>>>> // code to migrate from old to new one >>>>> new.update(migrate(old.value()); >>>>> old.update(null); >>>>> } >>>>> } >>>>> ``` >>>>> >>>>> And you can drop such code later, in the next savepoint. >>>>> >>>>> Best, >>>>> Piotrek >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ >>>>> >>>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <srigns...@gmail.com> >>>>> napisał(a): >>>>> >>>>>> Hi All, >>>>>> >>>>>> I am working on a scenario where I need to modify the existing >>>>>> savepoint operator state. Ex: Wanted to remove some offset of the >>>>>> savepoint. >>>>>> >>>>>> What is the better practice for these scenarios?. Could you please >>>>>> help me with any example as such? >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> -- >>>>>> *Sriram G* >>>>>> *Tech* >>>>>> >>>>>> >>>> >>>> -- >>>> *Sriram G* >>>> *Tech* >>>> >>>>