Hi Guowei,

I think the whole discussion here started as a means to avoid exposing
MailboxExecutor to the user. Your preferred way would be to improve AsyncIO
to support batching or implement AsyncSink as batching+AsyncIO.  Here are
some thoughts.

1) We should take a step back and note that we actually already expose
MailboxExecutor on the Operator API in particular to implement AsyncIO.
(Now, we could say that we actually want to hide Operator API in the future
and could make MailboxExecutor internal again)

2) Exposing MailboxExecutor in general is a means for users to access the
task thread in a cooperative way. That would allow certain communication
patterns beyond simply async requests. For example, the user could
re-enqueue retries or even effectively block input processing to induce
backpressure by enqueuing a waiting mail. So the main question is if we
want to empower sink devs to access the task thread or not. Note that
indirectly, sink devs have that option already through timers. So in
theory, they could also enqueue a MIN_TIME timer and use that to implement
any kind of async processing.

The main question here is what do you think is the harm of exposing
Mailbox? Is it the complexity or the maintenance overhead?

3) Simulating AsyncSink through AsyncIO is possible but has some downsides.
a) AsyncIO has no user state, so we would be quite limited in implementing
at-least-once sinks especially when it comes to retries. Chaining two
AsyncIO would make it even harder to reason about the built-in state. We
would also give up any chance to implement exactly once async sinks (even
though I'm not sure if it's possible at all).
b) Users will have a hard time to discover SinkUtil.sinkTo compared to the
expected stream.sinkTo. We have seen that on other occasions already (e.g.,
reinterpretAsKeyedStream).
c) AsyncIO is optimized to feed back the result into the main task thread.
That's completely unneeded for sinks.
d) You probably know it better than me, but imho the batch behavior of a
proper sink would be much better than an AsyncIO simulation (I have not
tracked the latest discussion in FLIP-147).

Note that if we absolutely don't want to expose MailboxExecutor, we can
also try to get it through some casts to internal interfaces. So the
Sink.InitContext could have a Sink.InitContextInternal subinterface that
includes the mailbox executor. In AsyncSink, we could cast to the internal
interface and are the only ones that can access the MailboxExecutor
legimately (of course, connector devs could do the same cast but then use
an internal class and need to expect breaking changes).

For your question:

> 2. By the way, I have a little question. Why not directly reduce the queue
> size to control the in-flight query, for example, according to your
> example,
> Is a queue size such as 150 enough? In fact, there are caches in many
> places in the entire topology, such as buffers on the network stack.
>
It's a bit different. Let's take kinesis as an example. The sink collects
500 elements and puts them in a single request (500 is max number of
records per batch request). Now it sends the request out. At the same time,
the next 500 elements are collected. So the in-flight query size refers to
the number of parallel requets (500 elements each).
If we first batch 500*numberOfInFlightRequests, and then send out all
numberOfInFlightRequests at the same time, we get worse latency.

On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Steffen
>
> Thank you for your detailed explanation.
>
> >>>But whether a sink is overloaded not only depends on the queue size. It
> also depends on the number of in-flight async requests
>
> 1. How about chaining two AsyncIOs? One is for controlling the size of the
> buffer elements; The other is for controlling the in-flight async requests.
> I think this way might do it and would not need to expose the
> MailboxExecutor.
> 2. By the way, I have a little question. Why not directly reduce the queue
> size to control the in-flight query, for example, according to your
> example,
> Is a queue size such as 150 enough? In fact, there are caches in many
> places in the entire topology, such as buffers on the network stack.
>
> >>> I’m not sure whether I’m understanding who you are referring to by
> user.
> Personally I mean the sink developer.
>
>
> Best,
> Guowei
>
>
> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
> <shau...@amazon.de.invalid> wrote:
>
>> Hey,
>>
>> We are using the `MailboxExecutor` to block calls to `write` in case the
>> sink is somehow overloaded. Overloaded basically means that the sink cannot
>> persist messages quickly enough into the respective destination.
>>
>> But whether a sink is overloaded not only depends on the queue size. It
>> also depends on the number of in-flight async requests that must not grow
>> too large either [1, 2]. We also need to support use cases where the
>> destination can only ingest x messages per second or a total throughput of
>> y per second. We are also planning to support time outs so that data is
>> persisted into the destination at least every n seconds by means of the
>> `ProcessingTimeService`. The batching configuration will be part of the
>> constructor, which has only been indicated in the current prototype but is
>> not implemented, yet [3].
>>
>> I’m not sure whether I’m understanding who you are referring to by user.
>> People who are using a concrete sink, eg, to send messages into a Kinesis
>> stream, will not be exposed to the `MailboxExecutor`. They are just using
>> the sink and pass in the batching configuration from above [4]. The
>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for sink
>> authors who want to create support for a new destination. I would expect
>> that there are only few experts who are adding support for new
>> destinations, who are capable to understand and use the advanced constructs
>> properly.
>>
>> Hope that helps to clarify our thinking.
>>
>> Cheers, Steffen
>>
>>
>> [1]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
>> [2]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
>> [3]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
>> [4]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
>>
>>
>>
>> From: Arvid Heise <ar...@apache.org>
>> Date: Tuesday, 20. July 2021 at 23:03
>> To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de>
>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
>>
>>
>> CAUTION: This email originated from outside of the organization. Do not
>> click links or open attachments unless you can confirm the sender and know
>> the content is safe.
>>
>>
>> Hi Guowei,
>>
>> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
>> implement FLIP-171 based on public interfaces (which would require exposing
>> MailboxExecutor as described here in FLIP-177) or if it's better to
>> implement it internally and hide it.
>> The first option is an abstract base class; your second option would be
>> an abstract interface that has matching implementation internally
>> (similarly to AsyncIO).
>> There is an example for option 1 in [2]; I think the idea was to
>> additionally specify the batch size and batch timeout in the ctor.
>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
>>
>> 2. I guess your question is if current AsyncIO is not sufficient already
>> if exactly-once is not needed? The answer is currently no, because AsyncIO
>> is not doing any batching. The user could do batching before that but
>> that's quite a bit of code. However, we should really think if AsyncIO
>> should also support batching.
>> I would also say that the scope of AsyncIO and AsyncSink is quite
>> different: the first one is for application developers and the second one
>> is for connector developers and would be deemed an implementation detail by
>> the application developer. Of course, advanced users may fall in both
>> categories, so the distinction does not always hold.
>>
>> Nevertheless, there is some overlap between both approaches and it's
>> important to think if the added complexity warrants the benefit. It would
>> be interesting to hear how other devs see that.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>> [2]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>>
>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com<mailto:
>> guowei....@gmail.com>> wrote:
>> Hi, Avrid
>> Thank you Avrid for perfecting Sink through this FLIP. I have two little
>> questions
>>
>> 1. What do you think of us directly providing an interface as follows? In
>> this way, there may be no need to expose the Mailbox to the user. We can
>> implement an `AsyncSinkWriterOperator` to control the length of the queue.
>> If it is too long, do not call SinkWriter::write.
>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at
>> first.
>>     int getQueueLimit();
>> }
>>
>> 2. Another question is: If users don't care about exactly once and the
>> unification of stream and batch, how about letting users use
>> `AsyncFunction` directly? I don’t have an answer either. I want to hear
>> your suggestions.
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto:
>> ar...@apache.org>> wrote:
>>
>> > Dear devs,
>> >
>> > today I'd like to start the discussion on the Sink API. I have drafted a
>> > FLIP [1] with an accompanying PR [2].
>> >
>> > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in
>> > one. In this discussion, we should decide on the scope and cut out too
>> > invasive steps if we can't reach an agreement.
>> >
>> > Step 1 is to add a few more pieces of information to context objects.
>> > That's non-breaking and needed for the async communication pattern in
>> > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I
>> > think that this should entail the least discussions.
>> >
>> > Step 2 is to also offer the same context information to committers.
>> Here we
>> > can offer some compatibility methods to not break existing sinks. The
>> main
>> > use case would be some async exactly-once sink but I'm not sure if we
>> would
>> > use async communication patterns at all here (or simply wait for all
>> async
>> > requests to finish in a sync way). It may also help with async cleanup
>> > tasks though.
>> >
>> > While drafting Step 2, I noticed the big entanglement of the current
>> API.
>> > To figure out if there is a committer during the stream graph creation,
>> we
>> > actually need to create a committer which can have unforeseen
>> consequences.
>> > Thus, I spiked if we can disentangle the interface and have separate
>> > interfaces for the different use cases. The resulting step 3 would be a
>> > completely breaking change and thus is probably controversial. However,
>> I'd
>> > also see the disentanglement as a way to prepare to make Sinks more
>> > expressive (write and commit coordinator) without completely overloading
>> > the main interface.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
>> > [2] https://github.com/apache/flink/pull/16399
>> > [3]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>> >
>>
>>
>>
>> Amazon Web Services EMEA SARL
>> 38 avenue John F. Kennedy, L-1855 Luxembourg
>> Sitz der Gesellschaft: L-1855 Luxemburg
>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>>
>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>> Marcel-Breuer-Str. 12, D-80807 Muenchen
>> Sitz der Zweigniederlassung: Muenchen
>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
>> 242240, USt-ID DE317013094
>>
>>
>>
>>

Reply via email to