On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler <nik...@ververica.com> wrote:
> So, you want to send basically the last message before the barrier? > Yes. > > Can you not instead send the first message after the barrier? From a first > glance this sounds easier. > I'm not sure if this will help me synchronize the sink with the same barrier. > > Can you share what you are trying to accomplish? > Here's the objective I'm trying to achieve: https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink Basically, I want to capture DB changes via CDC and update a parquet table (in delta format) consistently at each checkpoint. So, the data is first partitioned by primary key, each task handling a set of keys causes new files to be written, then when sink waits for barrier from all tasks which will follow the file names. Then the sink updates the delta table via a transaction and then consumes the barrier. > > Best regards, > Niklas > > > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com> > wrote: > > > > Thanks Niklas! This helps with synchronizing uploads across partitioned > tasks. The next step is to pass the handle to this upload to the sink which > should be part of the same checkpoint. Is it possible to do the following: > > > > 1. Keep reducing the events to keyedStore. > > 2. On snapshotState: upload the events and get the handle. Generate this > handle as the output for the sink to consume. > > 3. Return from snapshotState. > > > > Basically I want to ensure that the handle output is received by the > next stage before this checkpoint barrier. > > > > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com> > wrote: > > Hi Gopi, > > > > You can implement CheckpointedFunction and use the method > snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html > > > > Make sure, you don’t have unaligned checkpointing enabled. > > > > What it is: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing > > How to configure: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints > > > > Note that delays introduced in CheckpointedFunction#snapshotState can > slow down the job. > > > > You can also get the snapshot id from the FunctionSnapshotContext. Maybe > that removes the need for the source logic? > > > > Does this help? > > > > Best regards, > > Niklas > > > > > > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> > wrote: > > > > > > Hi, > > > In my flink operators, I need to connect to an external service to > update state. I was thinking that the updates to the external service can > be synchronized via checkpoint barriers. > > > > > > The topology of the stream is a source, then a single stage of > operator replicas handling different partitions, then all joining in a > single sink. > > > > > > Each operator will contact the external service when it receives a > checkpoint barrier and uploads local state (which caches the uploads and > returns a handle). > > > > > > After upload, it forwards the cache handle to the sink. Once sink > receives handles from all such operators, it calls the external service > with a list of handles received. This helps ensure that all handles are > from the same checkpoint barrier. > > > > > > Is it possible to achieve this in a flink application? > > > > > > Thanks, > > > Gopi > > > >