Hi Jing,
Thanks for the reply and follow up.

> What is the benefit for users to build a chain of mails instead of just one 
> mail(it is still async)?

Just to make sure we're on the same page, I try to paraphrase your question:
A `then()` call will be encapsulated as a callback mail. Your question
is whether we can call then() as little as possible to reduce the
overhead of encapsulating it into a mail.

In general, whether to call `then()` depends on the user's data
dependencies. The operations in a chain of `then()` are strictly
ordered.



The following is an example without data dependencies, if written in
the form of a `then` chain:
stateA.update(1).then(stateB.update(2).then(stateC.update(3)));

The execution order is:
```
stateA update 1 -> stateB update 2-> stateC update 3
```

If written in the form without `then()` call, they will be placed in a
"mail/mailboxDefaultAction", and each state request will still be
executed asynchronously:
```
stateA.update(1);
stateB.update(2);
stateC.update(3);
```

The order in which they are executed is undefined and may be:
```
- stateA update 1 -> stateB update 2-> stateC update 3
- stateB update 2 -> stateC update 3-> stateA update 1
- stateC update 3 -> stateA update 1-> stateB update 2
...
```
And the final results are "stateA = 1, stateB = 2, stateC = 3". In
this case, the two ways of writing are equivalent.



If there are data dependencies, for example:
```
stateA.update(1).then(stateA.update(2))
```

Then the execution order is:
```
stateA update 1 -> stateA update 2
```

If written in the form without `then()` call:
```
stateA.update(1);
stateA.update(2);
```

The order in which they are executed is undefined and may be:
```
- stateA update 1 -> stateA update 2
- stateA update 2-> stateA update 1
```
The final result may be "stateA = 1" *OR* "stateA = 2". In this case,
the way without `then()` chain to limit the execution order, and the
results may be wrong.

In summary, how many mails are encapsulated depends on how the user
writes the code, and how the user writes the code depends on their
data dependencies. [1][2] may be helpful for asynchronous programming
practice.


> I was wondering if exceptions in the mail chain would have an impact on the 
> reference counting?

We will catch exceptions that can be handled, they don't have impacts
on the reference counting.
For exceptions that cannot be handled, we will directly fail the job.

> Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths, 
> would make sense.

Nice suggestions, we will add a UT to cover those cases.


[1] 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
[2] https://www.codingjunkie.net/completable-futures-part1/

Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道:
>
> Hi Yanfei,
>
> Thanks for your clarification! Now I got a much clear picture and I am
> still trying to understand your thoughts for some of those questions:
>
>
> > How many mails are encapsulated depends on how the user writes the
> > code. The statements in a `then()` will be wrapped into a mail.
> > StateFuture is a restricted version of CompletableFuture, their basic
> > semantics are consistent.
> >
>
> Conceptually, users can write a chain of many async calls, i.e. many then()
> calls. And all these calls for Record A must be executed in order, while
> Record B should stay at the Blocking buffer. What is the benefit for users
> to build a chain of mails instead of just one mail(it is still async)? Is
> there any best practices or guidelines to teach/tell users when and how
> many async calls in a chain could/should be built?
>
> > The challenge arises in determining when all the processing logic
> associated with Record A is fully executed. To address this, we have
> adopted a reference counting mechanism that tracks ongoing operations
> (either processing input or executing a callback) related to a single
> record.
>
> > We describe this in the "Error handling"[2] section. This FLIP also
> > adopts the design from FLIP-368, ensuring that all state interfaces
> > throw unchecked exceptions and, consequently, do not declare any
> > exceptions in their signatures. In cases where an exception occurs
> > while accessing the state, the job should fail.
> >
>
> My question was not about how exceptions will be defined. I am not sure how
> unchecked exceptions handling will be implemented. I was wondering if
> exceptions in the mail chain would have an impact on the reference
> counting? E.g. in Fig 5, if an exception happened in the value(),
> update(int), or function within then(), any -1 counting might be missed?
> Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths,
> would make sense.
>
> Best regards,
> Jing
>
> On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote:
>
> > Hi Jing,
> >
> > Thanks for your thoughtful feedback!
> >
> > > does it mean that there will be three mails for Read, Update, and Output
> > ?
> >
> > With this example, there are two mails. The Read is processed by
> > `mailboxDefaultAction`[1], and the Update and Output are encapsulated
> > as mail.
> >
> > > does it make sense to encapsulate one mail instead of 3 mails with more
> > overhead?
> >
> >
>
> > How many mails are encapsulated depends on how the user writes the
> > code. The statements in a `then()` will be wrapped into a mail.
> > StateFuture is a restricted version of CompletableFuture, their basic
> > semantics are consistent.
> >
> >
>
> > > Would you like to add more description for cases when exceptions
> > happened? E.g. when reading or/and updating State throws IOExceptions.
> >
> >
>
> > We describe this in the "Error handling"[2] section. This FLIP also
> > adopts the design from FLIP-368, ensuring that all state interfaces
> > throw unchecked exceptions and, consequently, do not declare any
> > exceptions in their signatures. In cases where an exception occurs
> > while accessing the state, the job should fail.
> >
>
>
>
> > > Is it correct to understand that AEC is stateless?
> >
> > Great perspective, yes, it can be understood that way.
> > AEC is a task-level component. When the job fails or is restarted, the
> > runtime status in AEC will be reset.
> > In fact, we have considered taking a snapshot of the status in AEC and
> > storing it in a checkpoint like "unaligned checkpoint", but since
> > callback cannot be serialized, this idea is not feasible for the time
> > being.
> >
> > > would you like to add Pseudo-code for the inFilghtReocordNum decrement
> > to help us understand the logic better?
> >
> > This part of the code is a bit scattered, we will try to abstract a
> > pseudo-code. You can first refer to the RecordContext-related code [3]
> > in the PoC to understand it.
> >
> > [1]
> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> > [3]
> > https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
> >
> >
> > Best,
> > Yanfei
> >
> > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道:
> > >
> > > Hi Yanfei,
> > >
> > > Thanks for your proposal! The FLIP contains a lot of great new ideas. I'd
> > > like to ask some questions to make sure we are on the same page.
> > >
> > > > For the asynchronous interface, Record A should run with Read, Update
> > and
> > > Output, while Record B should stay at the Blocking buffer.
> > >
> > > 1. With this example, does it mean that there will be three mails for
> > Read,
> > > Update, and Output ?
> > > 2. If yes, since the Read, Update, and Output have to be executed before
> > > Record B, does it make sense to encapsulate one mail instead of 3 mails
> > > with more overhead? There must be some thoughts behind the design. Look
> > > forward to it.
> > >
> > > > The challenge arises in determining when all the processing logic
> > > associated with Record A is fully executed. To address this, we have
> > > adopted a reference counting mechanism that tracks ongoing operations
> > > (either processing input or executing a callback) related to a single
> > > record.
> > >
> > > The design reminds me of the JVM reference counting for GC. Would you
> > like
> > > to add more description for cases when exceptions happened? E.g. when
> > > reading or/and updating State throws IOExceptions.
> > >
> > > > In more detail, AEC uses a inFilghtReocordNum  variable to trace the
> > > current number of records in progress. Every time the AEC receives a new
> > > record, the inFilghtReocordNum  increases by 1; when all processing and
> > > callback for this record have completed, the inFilghtReocordNum
> > decreases
> > > by 1. When processing one checkpoint mail, the current task thread will
> > > give up the time slice through the yield() method of the mailbox
> > executor,
> > > so that the ongoing state request’s callback and the blocking state
> > > requests will be drained first until inFlightRecordNum reduces to 0.
> > >
> > > 1. Speaking of draining, is it correct to understand that AEC is
> > stateless?
> > > E.g. AEC could be easily scaled out if it became a bottleneck.
> > > 2. There are Pseudo-code for the inFilghtReocordNum increment, would you
> > > like to add Pseudo-code for the inFilghtReocordNum decrement to help us
> > > understand the logic better?
> > >
> > > The FLIP shows overall a great design! +1 for it! Looking forward to your
> > > thoughts, thanks!
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei <fredia...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion on FLIP-425: Asynchronous Execution
> > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage
> > > > and Management[2].
> > > >
> > > > FLIP-425 introduces a non-blocking execution model leveraging the
> > > > asynchronous APIs introduced in FLIP-424[3].
> > > > For the whole story please read the FLIP-423[2], and this thread is
> > > > aimed to discuss the details of "FLIP-425: Asynchronous Execution
> > > > Model".
> > > >
> > > > Regarding the details of this FLIP, there have been some discussions
> > > > here[4], mainly focusing on framework overhead profiling, watermark
> > > > processing, etc. Please see link[4] for the context.
> > > >
> > > > Looking forward to hearing from you!
> > > >
> > > >
> > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > [4] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> >



-- 
Best,
Yanfei

Reply via email to