Hi Gopi, You can implement CheckpointedFunction and use the method snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html
Make sure, you don’t have unaligned checkpointing enabled. What it is: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing How to configure: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints Note that delays introduced in CheckpointedFunction#snapshotState can slow down the job. You can also get the snapshot id from the FunctionSnapshotContext. Maybe that removes the need for the source logic? Does this help? Best regards, Niklas > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> wrote: > > Hi, > In my flink operators, I need to connect to an external service to update > state. I was thinking that the updates to the external service can be > synchronized via checkpoint barriers. > > The topology of the stream is a source, then a single stage of operator > replicas handling different partitions, then all joining in a single sink. > > Each operator will contact the external service when it receives a checkpoint > barrier and uploads local state (which caches the uploads and returns a > handle). > > After upload, it forwards the cache handle to the sink. Once sink receives > handles from all such operators, it calls the external service with a list of > handles received. This helps ensure that all handles are from the same > checkpoint barrier. > > Is it possible to achieve this in a flink application? > > Thanks, > Gopi