Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
Executor [1] with named lambdas, except for the name MailboxExecutor
nothing is hinting at a specific threading model.

Currently, we expose it on StreamOperator API. Afaik the idea is to make
the StreamOperator internal and beef up ProcessFunction but for several use
cases (e.g., AsyncIO), we actually need to expose the executor anyways.

We could rename MailboxExecutor to avoid exposing the name of the threading
model. For example, we could rename it to TaskThreadExecutor (but that's
pretty much the same), to CooperativeExecutor (again implies Mailbox), to
o.a.f.Executor, to DeferredExecutor... Ideas are welcome.

We could also simply use Java's Executor interface, however, when working
with that interface, I found that the missing context of async executed
lambdas made debugging much much harder. So that's why I designed
MailboxExecutor to force the user to give some debug string to each
invokation. In the sink context, we could, however, use an adaptor from
MailboxExecutor to Java's Executor and simply bind the sink name to the
invokations.

[1]
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html

On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> Regarding the question whether to expose the MailboxExecutor or not:
> 1. We have plans on exposing it in the ProcessFunction (in short we want to
> make StreamOperator API private/internal only, and move all of it's extra
> functionality in one way or another to the ProcessFunction). I don't
> remember and I'm not sure if *Dawid* had a different idea about this (do
> not expose Mailbox but wrap it somehow?)
> 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how
> helpful it will be for keeping backward compatibility in the future.
> `MailboxExecutor` is already a very generic interface that doesn't expose
> much about the current threading model. Note that the previous threading
> model (multi threaded with checkpoint lock), should be easy to implement
> using the `MailboxExecutor` interface (use a direct executor that acquires
> checkpoint lock).
>
> Having said that, I haven't spent too much time thinking about whether it's
> better to enrich AsyncIO or provide the AsyncSink. If we can just as
> efficiently provide the same functionality using the existing/enhanced
> AsyncIO API, that may be a good idea if it indeed reduces our
> maintenance costs.
>
> Piotrek
>
> pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a):
>
> > Hi, Arvid
> >
> > >>>The main question here is what do you think is the harm of exposing
> > Mailbox? Is it the complexity or the maintenance overhead?
> >
> > I think that exposing the internal threading model might be risky. In
> case
> > the threading model changes, it will affect the user's api and bring the
> > burden of internal modification. (Of course, you may have more say in how
> > the MailBox model will develop in the future) Therefore, I think that if
> an
> > alternative solution can be found, these possible risks will be avoided:
> > for example, through AsyncIO.
> >
> > >>>>AsyncIO has no user state, so we would be quite limited in
> implementing
> > at-least-once sinks especially when it comes to retries. Chaining two
> > AsyncIO would make it even harder to reason about the built-in state. We
> > would also give up any chance to implement exactly once async sinks (even
> > though I'm not sure if it's possible at all).
> >
> > 1. Why would we need to use the state when retrying(maybe I miss
> > something)? If a batch of asynchronous requests fails, I think it is
> enough
> > to retry directly in the callback. Or extend AsyncIO to give it the
> ability
> > to retry(XXXFuture.fail (Excelption)); in addition, in general, the
> client
> > has its own retry mechanism, at least the producer of Kineses said in the
> > document. Of course I am not opposed to retrying, I just want to find a
> > more obvious example to support the need to do so.
> >
> > 2. I don't think using AsyncIO will prevent exactly once in the future.
> > Both solutions need to be rewritten unless Exactly Once is required from
> > the beginning.
> >
> > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to
> the
> > expected stream.sinkTo. We have seen that on other occasions already
> (e.g.,
> > reinterpretAsKeyedStream).
> > In fact, I think this is the most important issue. We lack the function
> of
> > supporting sub-topology at the API layer, which is very inconvenient. For
> > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
> > ```java
> > AsyncSinkTopologyBuildrer {
> > void build(inputstream) {
> > input.flatmap().async()…
> > }
> > ```
> > In general, I want to know what principles will guide when to expose more
> > implementations to users, and when to combine with existing UDFs.
> >
> > Best,
> > Guowei
> >
> >
> > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Hi Guowei,
> > >
> > > I think the whole discussion here started as a means to avoid exposing
> > > MailboxExecutor to the user. Your preferred way would be to improve
> > AsyncIO
> > > to support batching or implement AsyncSink as batching+AsyncIO.  Here
> are
> > > some thoughts.
> > >
> > > 1) We should take a step back and note that we actually already expose
> > > MailboxExecutor on the Operator API in particular to implement AsyncIO.
> > > (Now, we could say that we actually want to hide Operator API in the
> > future
> > > and could make MailboxExecutor internal again)
> > >
> > > 2) Exposing MailboxExecutor in general is a means for users to access
> the
> > > task thread in a cooperative way. That would allow certain
> communication
> > > patterns beyond simply async requests. For example, the user could
> > > re-enqueue retries or even effectively block input processing to induce
> > > backpressure by enqueuing a waiting mail. So the main question is if we
> > > want to empower sink devs to access the task thread or not. Note that
> > > indirectly, sink devs have that option already through timers. So in
> > > theory, they could also enqueue a MIN_TIME timer and use that to
> > implement
> > > any kind of async processing.
> > >
> > > The main question here is what do you think is the harm of exposing
> > > Mailbox? Is it the complexity or the maintenance overhead?
> > >
> > > 3) Simulating AsyncSink through AsyncIO is possible but has some
> > > downsides.
> > > a) AsyncIO has no user state, so we would be quite limited in
> > implementing
> > > at-least-once sinks especially when it comes to retries. Chaining two
> > > AsyncIO would make it even harder to reason about the built-in state.
> We
> > > would also give up any chance to implement exactly once async sinks
> (even
> > > though I'm not sure if it's possible at all).
> > > b) Users will have a hard time to discover SinkUtil.sinkTo compared to
> > the
> > > expected stream.sinkTo. We have seen that on other occasions already
> > (e.g.,
> > > reinterpretAsKeyedStream).
> > > c) AsyncIO is optimized to feed back the result into the main task
> > thread.
> > > That's completely unneeded for sinks.
> > > d) You probably know it better than me, but imho the batch behavior of
> a
> > > proper sink would be much better than an AsyncIO simulation (I have not
> > > tracked the latest discussion in FLIP-147).
> > >
> > > Note that if we absolutely don't want to expose MailboxExecutor, we can
> > > also try to get it through some casts to internal interfaces. So the
> > > Sink.InitContext could have a Sink.InitContextInternal subinterface
> that
> > > includes the mailbox executor. In AsyncSink, we could cast to the
> > internal
> > > interface and are the only ones that can access the MailboxExecutor
> > > legimately (of course, connector devs could do the same cast but then
> use
> > > an internal class and need to expect breaking changes).
> > >
> > > For your question:
> > >
> > >> 2. By the way, I have a little question. Why not directly reduce the
> > >> queue size to control the in-flight query, for example, according to
> > your
> > >> example,
> > >> Is a queue size such as 150 enough? In fact, there are caches in many
> > >> places in the entire topology, such as buffers on the network stack.
> > >>
> > > It's a bit different. Let's take kinesis as an example. The sink
> collects
> > > 500 elements and puts them in a single request (500 is max number of
> > > records per batch request). Now it sends the request out. At the same
> > time,
> > > the next 500 elements are collected. So the in-flight query size refers
> > to
> > > the number of parallel requets (500 elements each).
> > > If we first batch 500*numberOfInFlightRequests, and then send out all
> > > numberOfInFlightRequests at the same time, we get worse latency.
> > >
> > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> > >
> > >> Hi, Steffen
> > >>
> > >> Thank you for your detailed explanation.
> > >>
> > >> >>>But whether a sink is overloaded not only depends on the queue
> size.
> > >> It also depends on the number of in-flight async requests
> > >>
> > >> 1. How about chaining two AsyncIOs? One is for controlling the size of
> > >> the buffer elements; The other is for controlling the in-flight async
> > >> requests. I think this way might do it and would not need to expose
> the
> > >> MailboxExecutor.
> > >> 2. By the way, I have a little question. Why not directly reduce the
> > >> queue size to control the in-flight query, for example, according to
> > your
> > >> example,
> > >> Is a queue size such as 150 enough? In fact, there are caches in many
> > >> places in the entire topology, such as buffers on the network stack.
> > >>
> > >> >>> I’m not sure whether I’m understanding who you are referring to by
> > >> user.
> > >> Personally I mean the sink developer.
> > >>
> > >>
> > >> Best,
> > >> Guowei
> > >>
> > >>
> > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
> > >> <shau...@amazon.de.invalid> wrote:
> > >>
> > >>> Hey,
> > >>>
> > >>> We are using the `MailboxExecutor` to block calls to `write` in case
> > the
> > >>> sink is somehow overloaded. Overloaded basically means that the sink
> > cannot
> > >>> persist messages quickly enough into the respective destination.
> > >>>
> > >>> But whether a sink is overloaded not only depends on the queue size.
> It
> > >>> also depends on the number of in-flight async requests that must not
> > grow
> > >>> too large either [1, 2]. We also need to support use cases where the
> > >>> destination can only ingest x messages per second or a total
> > throughput of
> > >>> y per second. We are also planning to support time outs so that data
> is
> > >>> persisted into the destination at least every n seconds by means of
> the
> > >>> `ProcessingTimeService`. The batching configuration will be part of
> the
> > >>> constructor, which has only been indicated in the current prototype
> > but is
> > >>> not implemented, yet [3].
> > >>>
> > >>> I’m not sure whether I’m understanding who you are referring to by
> > user.
> > >>> People who are using a concrete sink, eg, to send messages into a
> > Kinesis
> > >>> stream, will not be exposed to the `MailboxExecutor`. They are just
> > using
> > >>> the sink and pass in the batching configuration from above [4]. The
> > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for
> > sink
> > >>> authors who want to create support for a new destination. I would
> > expect
> > >>> that there are only few experts who are adding support for new
> > >>> destinations, who are capable to understand and use the advanced
> > constructs
> > >>> properly.
> > >>>
> > >>> Hope that helps to clarify our thinking.
> > >>>
> > >>> Cheers, Steffen
> > >>>
> > >>>
> > >>> [1]
> > >>>
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
> > >>> [2]
> > >>>
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
> > >>> [3]
> > >>>
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
> > >>> [4]
> > >>>
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
> > >>>
> > >>>
> > >>>
> > >>> From: Arvid Heise <ar...@apache.org>
> > >>> Date: Tuesday, 20. July 2021 at 23:03
> > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de>
> > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
> > >>>
> > >>>
> > >>> CAUTION: This email originated from outside of the organization. Do
> not
> > >>> click links or open attachments unless you can confirm the sender and
> > know
> > >>> the content is safe.
> > >>>
> > >>>
> > >>> Hi Guowei,
> > >>>
> > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
> > >>> implement FLIP-171 based on public interfaces (which would require
> > exposing
> > >>> MailboxExecutor as described here in FLIP-177) or if it's better to
> > >>> implement it internally and hide it.
> > >>> The first option is an abstract base class; your second option would
> be
> > >>> an abstract interface that has matching implementation internally
> > >>> (similarly to AsyncIO).
> > >>> There is an example for option 1 in [2]; I think the idea was to
> > >>> additionally specify the batch size and batch timeout in the ctor.
> > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
> > >>>
> > >>> 2. I guess your question is if current AsyncIO is not sufficient
> > already
> > >>> if exactly-once is not needed? The answer is currently no, because
> > AsyncIO
> > >>> is not doing any batching. The user could do batching before that but
> > >>> that's quite a bit of code. However, we should really think if
> AsyncIO
> > >>> should also support batching.
> > >>> I would also say that the scope of AsyncIO and AsyncSink is quite
> > >>> different: the first one is for application developers and the second
> > one
> > >>> is for connector developers and would be deemed an implementation
> > detail by
> > >>> the application developer. Of course, advanced users may fall in both
> > >>> categories, so the distinction does not always hold.
> > >>>
> > >>> Nevertheless, there is some overlap between both approaches and it's
> > >>> important to think if the added complexity warrants the benefit. It
> > would
> > >>> be interesting to hear how other devs see that.
> > >>>
> > >>> [1]
> > >>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > >>> [2]
> > >>>
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
> > >>>
> > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com
> > <mailto:
> > >>> guowei....@gmail.com>> wrote:
> > >>> Hi, Avrid
> > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two
> > little
> > >>> questions
> > >>>
> > >>> 1. What do you think of us directly providing an interface as
> follows?
> > In
> > >>> this way, there may be no need to expose the Mailbox to the user. We
> > can
> > >>> implement an `AsyncSinkWriterOperator` to control the length of the
> > >>> queue.
> > >>> If it is too long, do not call SinkWriter::write.
> > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
> > >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> > >>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture
> at
> > >>> first.
> > >>>     int getQueueLimit();
> > >>> }
> > >>>
> > >>> 2. Another question is: If users don't care about exactly once and
> the
> > >>> unification of stream and batch, how about letting users use
> > >>> `AsyncFunction` directly? I don’t have an answer either. I want to
> hear
> > >>> your suggestions.
> > >>>
> > >>> Best,
> > >>> Guowei
> > >>>
> > >>>
> > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org
> <mailto:
> > >>> ar...@apache.org>> wrote:
> > >>>
> > >>> > Dear devs,
> > >>> >
> > >>> > today I'd like to start the discussion on the Sink API. I have
> > drafted
> > >>> a
> > >>> > FLIP [1] with an accompanying PR [2].
> > >>> >
> > >>> > This FLIP is a bit special as it's actually a few smaller
> Amend-FLIPs
> > >>> in
> > >>> > one. In this discussion, we should decide on the scope and cut out
> > too
> > >>> > invasive steps if we can't reach an agreement.
> > >>> >
> > >>> > Step 1 is to add a few more pieces of information to context
> objects.
> > >>> > That's non-breaking and needed for the async communication pattern
> in
> > >>> > FLIP-171 [3]. While we need to add a new Public API
> > (MailboxExecutor),
> > >>> I
> > >>> > think that this should entail the least discussions.
> > >>> >
> > >>> > Step 2 is to also offer the same context information to committers.
> > >>> Here we
> > >>> > can offer some compatibility methods to not break existing sinks.
> The
> > >>> main
> > >>> > use case would be some async exactly-once sink but I'm not sure if
> we
> > >>> would
> > >>> > use async communication patterns at all here (or simply wait for
> all
> > >>> async
> > >>> > requests to finish in a sync way). It may also help with async
> > cleanup
> > >>> > tasks though.
> > >>> >
> > >>> > While drafting Step 2, I noticed the big entanglement of the
> current
> > >>> API.
> > >>> > To figure out if there is a committer during the stream graph
> > >>> creation, we
> > >>> > actually need to create a committer which can have unforeseen
> > >>> consequences.
> > >>> > Thus, I spiked if we can disentangle the interface and have
> separate
> > >>> > interfaces for the different use cases. The resulting step 3 would
> > be a
> > >>> > completely breaking change and thus is probably controversial.
> > >>> However, I'd
> > >>> > also see the disentanglement as a way to prepare to make Sinks more
> > >>> > expressive (write and commit coordinator) without completely
> > >>> overloading
> > >>> > the main interface.
> > >>> >
> > >>> > [1]
> > >>> >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > >>> > [2] https://github.com/apache/flink/pull/16399
> > >>> > [3]
> > >>> >
> > >>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> Amazon Web Services EMEA SARL
> > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
> > >>> Sitz der Gesellschaft: L-1855 Luxemburg
> > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
> > >>>
> > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
> > >>> Sitz der Zweigniederlassung: Muenchen
> > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> > >>> 242240, USt-ID DE317013094
> > >>>
> > >>>
> > >>>
> > >>>
> >
>

Reply via email to