Hey, We are using the `MailboxExecutor` to block calls to `write` in case the sink is somehow overloaded. Overloaded basically means that the sink cannot persist messages quickly enough into the respective destination.
But whether a sink is overloaded not only depends on the queue size. It also depends on the number of in-flight async requests that must not grow too large either [1, 2]. We also need to support use cases where the destination can only ingest x messages per second or a total throughput of y per second. We are also planning to support time outs so that data is persisted into the destination at least every n seconds by means of the `ProcessingTimeService`. The batching configuration will be part of the constructor, which has only been indicated in the current prototype but is not implemented, yet [3]. I’m not sure whether I’m understanding who you are referring to by user. People who are using a concrete sink, eg, to send messages into a Kinesis stream, will not be exposed to the `MailboxExecutor`. They are just using the sink and pass in the batching configuration from above [4]. The `MailboxExecutor` and `ProcessingTimeService` are only relevant for sink authors who want to create support for a new destination. I would expect that there are only few experts who are adding support for new destinations, who are capable to understand and use the advanced constructs properly. Hope that helps to clarify our thinking. Cheers, Steffen [1] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118 [2] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155 [3] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49 [4] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45 From: Arvid Heise <ar...@apache.org> Date: Tuesday, 20. July 2021 at 23:03 To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Guowei, 1. your idea is quite similar to FLIP-171 [1]. The question is if we implement FLIP-171 based on public interfaces (which would require exposing MailboxExecutor as described here in FLIP-177) or if it's better to implement it internally and hide it. The first option is an abstract base class; your second option would be an abstract interface that has matching implementation internally (similarly to AsyncIO). There is an example for option 1 in [2]; I think the idea was to additionally specify the batch size and batch timeout in the ctor. @Hausmann, Steffen<mailto:shau...@amazon.de> knows more. 2. I guess your question is if current AsyncIO is not sufficient already if exactly-once is not needed? The answer is currently no, because AsyncIO is not doing any batching. The user could do batching before that but that's quite a bit of code. However, we should really think if AsyncIO should also support batching. I would also say that the scope of AsyncIO and AsyncSink is quite different: the first one is for application developers and the second one is for connector developers and would be deemed an implementation detail by the application developer. Of course, advanced users may fall in both categories, so the distinction does not always hold. Nevertheless, there is some overlap between both approaches and it's important to think if the added complexity warrants the benefit. It would be interesting to hear how other devs see that. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink [2] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi, Avrid Thank you Avrid for perfecting Sink through this FLIP. I have two little questions 1. What do you think of us directly providing an interface as follows? In this way, there may be no need to expose the Mailbox to the user. We can implement an `AsyncSinkWriterOperator` to control the length of the queue. If it is too long, do not call SinkWriter::write. public interface AsyncSinkWriter<InputT, CommT, WriterStateT> extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT, WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at first. int getQueueLimit(); } 2. Another question is: If users don't care about exactly once and the unification of stream and batch, how about letting users use `AsyncFunction` directly? I don’t have an answer either. I want to hear your suggestions. Best, Guowei On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto:ar...@apache.org>> wrote: > Dear devs, > > today I'd like to start the discussion on the Sink API. I have drafted a > FLIP [1] with an accompanying PR [2]. > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in > one. In this discussion, we should decide on the scope and cut out too > invasive steps if we can't reach an agreement. > > Step 1 is to add a few more pieces of information to context objects. > That's non-breaking and needed for the async communication pattern in > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I > think that this should entail the least discussions. > > Step 2 is to also offer the same context information to committers. Here we > can offer some compatibility methods to not break existing sinks. The main > use case would be some async exactly-once sink but I'm not sure if we would > use async communication patterns at all here (or simply wait for all async > requests to finish in a sync way). It may also help with async cleanup > tasks though. > > While drafting Step 2, I noticed the big entanglement of the current API. > To figure out if there is a committer during the stream graph creation, we > actually need to create a committer which can have unforeseen consequences. > Thus, I spiked if we can disentangle the interface and have separate > interfaces for the different use cases. The resulting step 3 would be a > completely breaking change and thus is probably controversial. However, I'd > also see the disentanglement as a way to prepare to make Sinks more > expressive (write and commit coordinator) without completely overloading > the main interface. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > [2] https://github.com/apache/flink/pull/16399 > [3] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > Amazon Web Services EMEA SARL 38 avenue John F. Kennedy, L-1855 Luxembourg Sitz der Gesellschaft: L-1855 Luxemburg eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 Amazon Web Services EMEA SARL, Niederlassung Deutschland Marcel-Breuer-Str. 12, D-80807 Muenchen Sitz der Zweigniederlassung: Muenchen eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094