Hi Gyula,

Processing checkpoint halfway through `processElement` is problematic. The
current element will not be included in the input in-flight data, and we
cannot assume it has taken effect on the state by user code. So the best
way is to treat `processElement` as an 'atomic' operation. I guess that's
why the priority of the cp barrier is set low.
However, the AsyncWaitOperator is a special case where we know the element
blocked at `addToWorkQueue` has not started triggering the userFunction.
Thus I'd suggest putting the element in the queue when the cp barrier
comes, and taking a snapshot of the whole queue afterwards. The problem
will be solved. But this approach also involves some code modifications on
the mailbox executor.


Best,
Zakelly

On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Thank you for the detailed analysis Zakelly.
>
> I think we should consider whether yield should process checkpoint
> barriers because this puts quite a serious limitation on the unaligned
> checkpoints in these cases.
> Do you know what is the reason behind the current priority setting? Is
> there a problem with processing the barrier here?
>
> Gyula
>
> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Well I tried your example in local mini-cluster, and it seems the source
>> can take checkpoints but it will block in the following AsyncWaitOperator.
>> IIUC, the unaligned checkpoint barrier should wait until the current
>> `processElement` finishes its execution. In your example, the element queue
>> of `AsyncWaitOperator` will end up full and `processElement` will be
>> blocked at `addToWorkQueue`. Even though it will call
>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>> unprocessed since the priority of the barrier is -1, lower than the one
>> `yield()` should handle. I verified this using single-step debugging.
>>
>> And if one element could finish its async io, the cp barrier can be
>> processed afterwards. For example:
>> ```
>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>> env.getCheckpointConfig().setCheckpointInterval(10000);  // 10s interval
>> env.getConfig().setParallelism(1);
>> AsyncDataStream.orderedWait(
>>                 env.fromSequence(Long.MIN_VALUE,
>> Long.MAX_VALUE).shuffle(),
>>                 new AsyncFunction<Long, Long>() {
>>                     boolean first = true;
>>                     @Override
>>                     public void asyncInvoke(Long aLong,
>> ResultFuture<Long> resultFuture) {
>>                         if (first) {
>>
>> Executors.newSingleThreadExecutor().execute(() -> {
>>                                 try {
>>                                     Thread.sleep(20000); // process after
>> 20s, only for the first one.
>>                                 } catch (Throwable e) {}
>>                                 LOG.info("Complete one");
>>
>> resultFuture.complete(Collections.singleton(1L));
>>                             });
>>                             first = false;
>>                         }
>>                     }
>>                 },
>>                 24,
>>                 TimeUnit.HOURS,
>>                 1)
>>         .print();
>> ```
>> The checkpoint 1 can be normally finished after the "Complete one" log
>> print.
>>
>> I guess the users have no means to solve this problem, we might optimize
>> this later.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hey all!
>>>
>>> I encountered a strange and unexpected behaviour when trying to use
>>> unaligned checkpoints with AsyncIO.
>>>
>>> If the async operation queue is full and backpressures the pipeline
>>> completely, then unaligned checkpoints cannot be completed. To me this
>>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>>> that we can simply checkpoint the queue and not have to wait for the
>>> completion.
>>>
>>> To repro you can simply run:
>>>
>>> AsyncDataStream.orderedWait(
>>>     env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>>     new AsyncFunction<Long, Long>() {
>>>         @Override
>>>         public void asyncInvoke(Long aLong, ResultFuture<Long>
>>> resultFuture) {}
>>>     },
>>>     24,
>>>     TimeUnit.HOURS,
>>>     1)
>>>     .print();
>>>
>>> This pipeline will completely backpressure the source and checkpoints do
>>> not progress even though they are unaligned. Already the source cannot take
>>> a checkpoint it seems which for me is surprising because this is using the
>>> new source interface.
>>>
>>> Does anyone know why this happens and if there may be a solution?
>>>
>>> Thanks
>>> Gyula
>>>
>>

Reply via email to