Hi Marc,

thanks for clarifying, I had misunderstood some parts.
Unfortunately, I don't think there is a way to update keyed state (for
multiple keys even) outside of a keyed context.

I will ask if someone else has an idea, but allow me to ask one
counter-question first: Did you actually run tests to verify that using the
custom state solution is more efficient than using Flink's keyed state
regularly (in the end, you would even have to include the state
synchronization in the performance test)? Efficient stateful stream
processing is one of the key features of Flink, and you are essentially
trying to override a specific piece of it with custom logic.

Best regards,
Nico

On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER <maleger...@gmail.com> wrote:

> Hello Nicolaus,
>
> Thank you for your quick feedback, sorry if I am not clear enough.
> Actually in the documented example, the state which is updated in the
> snapshotState method is an operator state and not a keyed state:
>
> *public void initializeState(FunctionInitializationContext context) throws
> Exception {*
>
>
> *  [...]*
>
> *  countPerPartition =
> context.getOperatorStateStore().getOperatorState(new
> ListStateDescriptor<>("perPartitionCount", Long.class));*
>
>
>
>
> *  [...] } public void snapshotState(FunctionSnapshotContext context)
> throws Exception {*
>
>
> *  [...]*
>
> *  countPerPartition.add(localCount);*
>
> *}*
>
>
> It seems that the method is then only called once per operator parallel
> task and not once per key.
> On my side I have two keyed states with same key (e.g., userId) in a
> CoFlatMapFunction:
>
>
>
>
> *// Control state partitioned by userId private ValueState<Control>
> controlState; // Data state partitioned by userId coming from the
> ser/deserialization of a custom system having a partitioned state private
> ValueState<byte[]> dataState;*
>
> and I would like to do something like that to update dataState in a keyed
> context for every key and every checkpoint:
>
>
>
> *public void snapshotState(FunctionSnapshotContext context) throws
> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
> // Not a keyed context here ! }*
>
> instead of saving dataState in the flatMap2 function for every received
> event:
>
>
> *public void flatMap1(Control control, Collector<Control> out) {*
>
> *   controlState.update(control); *
>
> *}*
>
>
>
>
>
>
>
>
>
> *public void flatMap2(Event event, Collector<ProcessedEvent> out) {  //
> Perform some event transformations based on controlState  ProcessedEvent
> result = customSystem.process(controlState.value() , event);  // Save
> internal custom system state after processing: can be costly if high event
> throughput
> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
> // Output the processed event  out.collect(result); }*
>
>
> So basically, I want to be able to synchronize the partitioned state of my
> custom system with the checkpoints done by Flink.
>
>
> Best Regards,
> Marc
>
> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> I think you can just use keyed state in a
>> CheckpointedFunction. FunctionInitializationContext gives you access to
>> both keyed state and operator state (your stream needs to be keyed, of
>> course). So you could just update your local custom state on regular
>> invocations and update keyed state on calls to snapshotState.
>> Check out the example in [1] where both types of state are used.
>>
>> Does that help? Not sure if I understood the problem correctly.
>>
>> Best regards,
>> Nico
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>>
>> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER <maleger...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is there any method available in a RichFunction to be called by Flink
>>> with a keyed context each time a checkpoint is triggered please ?
>>>
>>> It seems that the CheckpointedFunction interface provides such a feature
>>> (snapshotState method) but only in case of operator state and it is called
>>> in a non-keyed context.
>>>
>>> Indeed, I am implementing a CoFlatMapFunction with:
>>> - a keyed state (state1) for a "control" stream (stream1) which is not
>>> often updated,
>>> - a keyed state (state2) for a "data" stream (stream2) with a high
>>> throughput and relying on a custom solution for internal state snapshot
>>> with some potential performance impact.
>>>
>>> Consequently, I don't want to trigger a state2 update for every event
>>> received in stream2 for efficiency reasons but rather update state2 based
>>> on checkpoints triggered by Flink.
>>>
>>> Best Regards,
>>> Marc
>>>
>>>

Reply via email to