Hi Gyula,

Well I tried your example in local mini-cluster, and it seems the source
can take checkpoints but it will block in the following AsyncWaitOperator.
IIUC, the unaligned checkpoint barrier should wait until the current
`processElement` finishes its execution. In your example, the element queue
of `AsyncWaitOperator` will end up full and `processElement` will be
blocked at `addToWorkQueue`. Even though it will call
`mailboxExecutor.yield();`, it still leaves the checkpoint barrier
unprocessed since the priority of the barrier is -1, lower than the one
`yield()` should handle. I verified this using single-step debugging.

And if one element could finish its async io, the cp barrier can be
processed afterwards. For example:
```
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setCheckpointInterval(10000);  // 10s interval
env.getConfig().setParallelism(1);
AsyncDataStream.orderedWait(
                env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
                new AsyncFunction<Long, Long>() {
                    boolean first = true;
                    @Override
                    public void asyncInvoke(Long aLong, ResultFuture<Long>
resultFuture) {
                        if (first) {
                            Executors.newSingleThreadExecutor().execute(()
-> {
                                try {
                                    Thread.sleep(20000); // process after
20s, only for the first one.
                                } catch (Throwable e) {}
                                LOG.info("Complete one");

resultFuture.complete(Collections.singleton(1L));
                            });
                            first = false;
                        }
                    }
                },
                24,
                TimeUnit.HOURS,
                1)
        .print();
```
The checkpoint 1 can be normally finished after the "Complete one" log
print.

I guess the users have no means to solve this problem, we might optimize
this later.


Best,
Zakelly

On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hey all!
>
> I encountered a strange and unexpected behaviour when trying to use
> unaligned checkpoints with AsyncIO.
>
> If the async operation queue is full and backpressures the pipeline
> completely, then unaligned checkpoints cannot be completed. To me this
> sounds counterintuitive because one of the benefits of the AsyncIO would be
> that we can simply checkpoint the queue and not have to wait for the
> completion.
>
> To repro you can simply run:
>
> AsyncDataStream.orderedWait(
>     env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>     new AsyncFunction<Long, Long>() {
>         @Override
>         public void asyncInvoke(Long aLong, ResultFuture<Long>
> resultFuture) {}
>     },
>     24,
>     TimeUnit.HOURS,
>     1)
>     .print();
>
> This pipeline will completely backpressure the source and checkpoints do
> not progress even though they are unaligned. Already the source cannot take
> a checkpoint it seems which for me is surprising because this is using the
> new source interface.
>
> Does anyone know why this happens and if there may be a solution?
>
> Thanks
> Gyula
>

Reply via email to