Hi Gopi,

You can implement CheckpointedFunction and use the method 
snapshotState(FunctionSnapshotContext) to upload state on each checkpoint.
 
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html

Make sure, you don’t have unaligned checkpointing enabled.

What it is: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing
How to configure: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints

Note that delays introduced in CheckpointedFunction#snapshotState can slow down 
the job. 

You can also get the snapshot id from the FunctionSnapshotContext. Maybe that 
removes the need for the source logic?

Does this help?

Best regards,
Niklas


> On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> wrote:
> 
> Hi,
> In my flink operators, I need to connect to an external service to update 
> state. I was thinking that the updates to the external service can be 
> synchronized via checkpoint barriers. 
> 
> The topology of the stream is a source, then a single stage of operator 
> replicas handling different partitions, then all joining in a single sink.
> 
> Each operator will contact the external service when it receives a checkpoint 
> barrier and uploads local state (which caches the uploads and returns a 
> handle).
> 
> After upload, it forwards the cache handle to the sink. Once sink receives 
> handles from all such operators, it calls the external service with a list of 
> handles received. This helps ensure that all handles are from the same 
> checkpoint barrier.
> 
> Is it possible to achieve this in a flink application?
> 
> Thanks,
> Gopi

Reply via email to