Hey,

We are using the `MailboxExecutor` to block calls to `write` in case the sink 
is somehow overloaded. Overloaded basically means that the sink cannot persist 
messages quickly enough into the respective destination.

But whether a sink is overloaded not only depends on the queue size. It also 
depends on the number of in-flight async requests that must not grow too large 
either [1, 2]. We also need to support use cases where the destination can only 
ingest x messages per second or a total throughput of y per second. We are also 
planning to support time outs so that data is persisted into the destination at 
least every n seconds by means of the `ProcessingTimeService`. The batching 
configuration will be part of the constructor, which has only been indicated in 
the current prototype but is not implemented, yet [3].

I’m not sure whether I’m understanding who you are referring to by user. People 
who are using a concrete sink, eg, to send messages into a Kinesis stream, will 
not be exposed to the `MailboxExecutor`. They are just using the sink and pass 
in the batching configuration from above [4]. The `MailboxExecutor` and 
`ProcessingTimeService` are only relevant for sink authors who want to create 
support for a new destination. I would expect that there are only few experts 
who are adding support for new destinations, who are capable to understand and 
use the advanced constructs properly.

Hope that helps to clarify our thinking.

Cheers, Steffen


[1] 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
[2] 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
[3] 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
[4] 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45



From: Arvid Heise <ar...@apache.org>
Date: Tuesday, 20. July 2021 at 23:03
To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de>
Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Guowei,

1. your idea is quite similar to FLIP-171 [1]. The question is if we implement 
FLIP-171 based on public interfaces (which would require exposing 
MailboxExecutor as described here in FLIP-177) or if it's better to implement 
it internally and hide it.
The first option is an abstract base class; your second option would be an 
abstract interface that has matching implementation internally (similarly to 
AsyncIO).
There is an example for option 1 in [2]; I think the idea was to additionally 
specify the batch size and batch timeout in the ctor. @Hausmann, 
Steffen<mailto:shau...@amazon.de> knows more.

2. I guess your question is if current AsyncIO is not sufficient already if 
exactly-once is not needed? The answer is currently no, because AsyncIO is not 
doing any batching. The user could do batching before that but that's quite a 
bit of code. However, we should really think if AsyncIO should also support 
batching.
I would also say that the scope of AsyncIO and AsyncSink is quite different: 
the first one is for application developers and the second one is for connector 
developers and would be deemed an implementation detail by the application 
developer. Of course, advanced users may fall in both categories, so the 
distinction does not always hold.

Nevertheless, there is some overlap between both approaches and it's important 
to think if the added complexity warrants the benefit. It would be interesting 
to hear how other devs see that.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
[2] 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java

On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi, Avrid
Thank you Avrid for perfecting Sink through this FLIP. I have two little
questions

1. What do you think of us directly providing an interface as follows? In
this way, there may be no need to expose the Mailbox to the user. We can
implement an `AsyncSinkWriterOperator` to control the length of the queue.
If it is too long, do not call SinkWriter::write.
public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
        extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at
first.
    int getQueueLimit();
}

2. Another question is: If users don't care about exactly once and the
unification of stream and batch, how about letting users use
`AsyncFunction` directly? I don’t have an answer either. I want to hear
your suggestions.

Best,
Guowei


On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise 
<ar...@apache.org<mailto:ar...@apache.org>> wrote:

> Dear devs,
>
> today I'd like to start the discussion on the Sink API. I have drafted a
> FLIP [1] with an accompanying PR [2].
>
> This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in
> one. In this discussion, we should decide on the scope and cut out too
> invasive steps if we can't reach an agreement.
>
> Step 1 is to add a few more pieces of information to context objects.
> That's non-breaking and needed for the async communication pattern in
> FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I
> think that this should entail the least discussions.
>
> Step 2 is to also offer the same context information to committers. Here we
> can offer some compatibility methods to not break existing sinks. The main
> use case would be some async exactly-once sink but I'm not sure if we would
> use async communication patterns at all here (or simply wait for all async
> requests to finish in a sync way). It may also help with async cleanup
> tasks though.
>
> While drafting Step 2, I noticed the big entanglement of the current API.
> To figure out if there is a committer during the stream graph creation, we
> actually need to create a committer which can have unforeseen consequences.
> Thus, I spiked if we can disentangle the interface and have separate
> interfaces for the different use cases. The resulting step 3 would be a
> completely breaking change and thus is probably controversial. However, I'd
> also see the disentanglement as a way to prepare to make Sinks more
> expressive (write and commit coordinator) without completely overloading
> the main interface.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> [2] https://github.com/apache/flink/pull/16399
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>



Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, 
USt-ID DE317013094



Reply via email to