Thank you for the detailed analysis Zakelly.

I think we should consider whether yield should process checkpoint barriers
because this puts quite a serious limitation on the unaligned checkpoints
in these cases.
Do you know what is the reason behind the current priority setting? Is
there a problem with processing the barrier here?

Gyula

On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Gyula,
>
> Well I tried your example in local mini-cluster, and it seems the source
> can take checkpoints but it will block in the following AsyncWaitOperator.
> IIUC, the unaligned checkpoint barrier should wait until the current
> `processElement` finishes its execution. In your example, the element queue
> of `AsyncWaitOperator` will end up full and `processElement` will be
> blocked at `addToWorkQueue`. Even though it will call
> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
> unprocessed since the priority of the barrier is -1, lower than the one
> `yield()` should handle. I verified this using single-step debugging.
>
> And if one element could finish its async io, the cp barrier can be
> processed afterwards. For example:
> ```
> env.getCheckpointConfig().enableUnalignedCheckpoints();
> env.getCheckpointConfig().setCheckpointInterval(10000);  // 10s interval
> env.getConfig().setParallelism(1);
> AsyncDataStream.orderedWait(
>                 env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>                 new AsyncFunction<Long, Long>() {
>                     boolean first = true;
>                     @Override
>                     public void asyncInvoke(Long aLong, ResultFuture<Long>
> resultFuture) {
>                         if (first) {
>                             Executors.newSingleThreadExecutor().execute(()
> -> {
>                                 try {
>                                     Thread.sleep(20000); // process after
> 20s, only for the first one.
>                                 } catch (Throwable e) {}
>                                 LOG.info("Complete one");
>
> resultFuture.complete(Collections.singleton(1L));
>                             });
>                             first = false;
>                         }
>                     }
>                 },
>                 24,
>                 TimeUnit.HOURS,
>                 1)
>         .print();
> ```
> The checkpoint 1 can be normally finished after the "Complete one" log
> print.
>
> I guess the users have no means to solve this problem, we might optimize
> this later.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hey all!
>>
>> I encountered a strange and unexpected behaviour when trying to use
>> unaligned checkpoints with AsyncIO.
>>
>> If the async operation queue is full and backpressures the pipeline
>> completely, then unaligned checkpoints cannot be completed. To me this
>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>> that we can simply checkpoint the queue and not have to wait for the
>> completion.
>>
>> To repro you can simply run:
>>
>> AsyncDataStream.orderedWait(
>>     env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>     new AsyncFunction<Long, Long>() {
>>         @Override
>>         public void asyncInvoke(Long aLong, ResultFuture<Long>
>> resultFuture) {}
>>     },
>>     24,
>>     TimeUnit.HOURS,
>>     1)
>>     .print();
>>
>> This pipeline will completely backpressure the source and checkpoints do
>> not progress even though they are unaligned. Already the source cannot take
>> a checkpoint it seems which for me is surprising because this is using the
>> new source interface.
>>
>> Does anyone know why this happens and if there may be a solution?
>>
>> Thanks
>> Gyula
>>
>

Reply via email to