Hi Gyula,
Well I tried your example in local mini-cluster, and it seems the source
can take checkpoints but it will block in the following AsyncWaitOperator.
IIUC, the unaligned checkpoint barrier should wait until the current
`processElement` finishes its execution. In your example, the element queue
of `AsyncWaitOperator` will end up full and `processElement` will be
blocked at `addToWorkQueue`. Even though it will call
`mailboxExecutor.yield();`, it still leaves the checkpoint barrier
unprocessed since the priority of the barrier is -1, lower than the one
`yield()` should handle. I verified this using single-step debugging.
And if one element could finish its async io, the cp barrier can be
processed afterwards. For example:
```
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setCheckpointInterval(10000); // 10s interval
env.getConfig().setParallelism(1);
AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction<Long, Long>() {
boolean first = true;
@Override
public void asyncInvoke(Long aLong, ResultFuture<Long>
resultFuture) {
if (first) {
Executors.newSingleThreadExecutor().execute(()
-> {
try {
Thread.sleep(20000); // process after
20s, only for the first one.
} catch (Throwable e) {}
LOG.info("Complete one");
resultFuture.complete(Collections.singleton(1L));
});
first = false;
}
}
},
24,
TimeUnit.HOURS,
1)
.print();
```
The checkpoint 1 can be normally finished after the "Complete one" log
print.
I guess the users have no means to solve this problem, we might optimize
this later.
Best,
Zakelly
On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <[email protected]> wrote:
> Hey all!
>
> I encountered a strange and unexpected behaviour when trying to use
> unaligned checkpoints with AsyncIO.
>
> If the async operation queue is full and backpressures the pipeline
> completely, then unaligned checkpoints cannot be completed. To me this
> sounds counterintuitive because one of the benefits of the AsyncIO would be
> that we can simply checkpoint the queue and not have to wait for the
> completion.
>
> To repro you can simply run:
>
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction<Long, Long>() {
> @Override
> public void asyncInvoke(Long aLong, ResultFuture<Long>
> resultFuture) {}
> },
> 24,
> TimeUnit.HOURS,
> 1)
> .print();
>
> This pipeline will completely backpressure the source and checkpoints do
> not progress even though they are unaligned. Already the source cannot take
> a checkpoint it seems which for me is surprising because this is using the
> new source interface.
>
> Does anyone know why this happens and if there may be a solution?
>
> Thanks
> Gyula
>