Hi Yanfei,

Thanks for driving the discussion! I have a few questions from the SQL
perspective (and may have some follow-ups).

1. Currently, there are some mini-batch optimizations[1][2] in SQL that
update the state during the prepare snapshot phase, which will explicitly
call setCurrentKey in the operator. Would you provide more details on how
to adapt to the new API?

2. Correct me if I'm wrong, but given that 'execution.async-state.enabled'
is true by default; I think SQL users who write UDF and utilize the state
backend should also migrate to the new API.

[1] MiniBatchGroupAggFunction.java#L174
<https://github.com/apache/flink/blob/fcf61b9108942a892aa93765178a81f2c0e787c5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L174>
[2] MiniBatchStreamingJoinOperator.java#L226
<https://github.com/apache/flink/blob/fcf61b9108942a892aa93765178a81f2c0e787c5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java#L226>

Best,
Jane


On Mon, Mar 11, 2024 at 4:00 PM Yanfei Lei <fredia...@gmail.com> wrote:

> Hi Gyula,
>
> Thanks for your comments.
>
> "Record Context" is a concept similar to the current "Key Context"[1].
> which is mutable.
>
> > If yes, does this mean that the context itself becomes "immutable" or
> the context is switched in the background?
>
> "Record Context" is switched in the background, its switching is
> similar to that of "Key Context"[2], the framework is responsible for
> switching, and no additional switchings are required by the user.
>
> > One example use-case would be that you get something from the state and
> based on the value you register a timer.
>
> For this example, users need to inherit KeyedProcessFunction[3] to
> access the timer service under the current synchronous state API.
> Similarly, under the asynchronous API, users can access the timer
> service (as an outer-scoped variable) in the callback function by
> inheriting KeyedProcessFunction.
>
> > In other words can they access all functionality from the callback?
> Back to the original question, the answer is yes, we can make these
> functionalities accessible to callback in the form of outer-scoped
> variables.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L510
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/process_function/#example
>
>
> Best,
> Yanfei
>
> Gyula Fóra <gyula.f...@gmail.com> 于2024年3月11日周一 13:41写道:
> >
> > Hey!
> >
> > I may have missed this in the FLIP but did not find it spelled out
> > explicitly.
> >
> > Can the user use the entire record processing context in their async
> > execution?
> > In other words can they access all functionality from the callback? For
> > example can they simply reference the keyed context and register a new
> > timer, get the timestamp etc?
> >
> > If yes, does this mean that the context itself becomes "immutable" or the
> > context is switched in the background?
> >
> > One example use-case would be that you get something from the state and
> > based on the value you register a timer.
> >
> > Thanks,
> > Gyula
> >
> > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for your thoughtful feedback!
> > >
> > > > does it mean that there will be three mails for Read, Update, and
> Output
> > > ?
> > >
> > > With this example, there are two mails. The Read is processed by
> > > `mailboxDefaultAction`[1], and the Update and Output are encapsulated
> > > as mail.
> > >
> > > > does it make sense to encapsulate one mail instead of 3 mails with
> more
> > > overhead?
> > >
> > > How many mails are encapsulated depends on how the user writes the
> > > code. The statements in a `then()` will be wrapped into a mail.
> > > StateFuture is a restricted version of CompletableFuture, their basic
> > > semantics are consistent.
> > >
> > > > Would you like to add more description for cases when exceptions
> > > happened? E.g. when reading or/and updating State throws IOExceptions.
> > >
> > > We describe this in the "Error handling"[2] section. This FLIP also
> > > adopts the design from FLIP-368, ensuring that all state interfaces
> > > throw unchecked exceptions and, consequently, do not declare any
> > > exceptions in their signatures. In cases where an exception occurs
> > > while accessing the state, the job should fail.
> > >
> > > > Is it correct to understand that AEC is stateless?
> > >
> > > Great perspective, yes, it can be understood that way.
> > > AEC is a task-level component. When the job fails or is restarted, the
> > > runtime status in AEC will be reset.
> > > In fact, we have considered taking a snapshot of the status in AEC and
> > > storing it in a checkpoint like "unaligned checkpoint", but since
> > > callback cannot be serialized, this idea is not feasible for the time
> > > being.
> > >
> > > > would you like to add Pseudo-code for the inFilghtReocordNum
> decrement
> > > to help us understand the logic better?
> > >
> > > This part of the code is a bit scattered, we will try to abstract a
> > > pseudo-code. You can first refer to the RecordContext-related code [3]
> > > in the PoC to understand it.
> > >
> > > [1]
> > >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> > > [2]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> > > [3]
> > >
> https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
> > >
> > >
> > > Best,
> > > Yanfei
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道:
> > > >
> > > > Hi Yanfei,
> > > >
> > > > Thanks for your proposal! The FLIP contains a lot of great new
> ideas. I'd
> > > > like to ask some questions to make sure we are on the same page.
> > > >
> > > > > For the asynchronous interface, Record A should run with Read,
> Update
> > > and
> > > > Output, while Record B should stay at the Blocking buffer.
> > > >
> > > > 1. With this example, does it mean that there will be three mails for
> > > Read,
> > > > Update, and Output ?
> > > > 2. If yes, since the Read, Update, and Output have to be executed
> before
> > > > Record B, does it make sense to encapsulate one mail instead of 3
> mails
> > > > with more overhead? There must be some thoughts behind the design.
> Look
> > > > forward to it.
> > > >
> > > > > The challenge arises in determining when all the processing logic
> > > > associated with Record A is fully executed. To address this, we have
> > > > adopted a reference counting mechanism that tracks ongoing operations
> > > > (either processing input or executing a callback) related to a single
> > > > record.
> > > >
> > > > The design reminds me of the JVM reference counting for GC. Would you
> > > like
> > > > to add more description for cases when exceptions happened? E.g. when
> > > > reading or/and updating State throws IOExceptions.
> > > >
> > > > > In more detail, AEC uses a inFilghtReocordNum  variable to trace
> the
> > > > current number of records in progress. Every time the AEC receives a
> new
> > > > record, the inFilghtReocordNum  increases by 1; when all processing
> and
> > > > callback for this record have completed, the inFilghtReocordNum
> > > decreases
> > > > by 1. When processing one checkpoint mail, the current task thread
> will
> > > > give up the time slice through the yield() method of the mailbox
> > > executor,
> > > > so that the ongoing state request’s callback and the blocking state
> > > > requests will be drained first until inFlightRecordNum reduces to 0.
> > > >
> > > > 1. Speaking of draining, is it correct to understand that AEC is
> > > stateless?
> > > > E.g. AEC could be easily scaled out if it became a bottleneck.
> > > > 2. There are Pseudo-code for the inFilghtReocordNum increment, would
> you
> > > > like to add Pseudo-code for the inFilghtReocordNum decrement to help
> us
> > > > understand the logic better?
> > > >
> > > > The FLIP shows overall a great design! +1 for it! Looking forward to
> your
> > > > thoughts, thanks!
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei <fredia...@gmail.com>
> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion on FLIP-425: Asynchronous Execution
> > > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State
> Storage
> > > > > and Management[2].
> > > > >
> > > > > FLIP-425 introduces a non-blocking execution model leveraging the
> > > > > asynchronous APIs introduced in FLIP-424[3].
> > > > > For the whole story please read the FLIP-423[2], and this thread is
> > > > > aimed to discuss the details of "FLIP-425: Asynchronous Execution
> > > > > Model".
> > > > >
> > > > > Regarding the details of this FLIP, there have been some
> discussions
> > > > > here[4], mainly focusing on framework overhead profiling, watermark
> > > > > processing, etc. Please see link[4] for the context.
> > > > >
> > > > > Looking forward to hearing from you!
> > > > >
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > [4]
> https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > >
>

Reply via email to