Sure, thanks for the pointers.


On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen <>

> Hi Till,
> You are right, I’ve left out some implementation details, which have
> actually changed a couple of time as part of the ongoing discussion. You
> can find our current prototype here [1] and a sample implementation of the
> KPL free Kinesis sink here [2].
> I plan to update the FLIP. But I think would it be make sense to wait
> until the implementation has stabilized enough before we update the FLIP to
> the final state.
> Does that make sense?
> Cheers, Steffen
> [1]
> [2]
> From: Till Rohrmann <>
> Date: Friday, 16. July 2021 at 18:10
> To: Piotr Nowojski <>
> Cc: Steffen Hausmann <>, "" <
>>, Arvid Heise <>
> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
> Hi Steffen,
> I've taken another look at the FLIP and I stumbled across a couple of
> inconsistencies. I think it is mainly because of the lacking code. For
> example, it is not fully clear to me based on the current FLIP how we
> ensure that there are no in-flight requests when
> AsyncSinkWriter.snapshotState is called. Also the concrete implementation
> of the AsyncSinkCommitter could be helpful for understanding how the
> AsyncSinkWriter works in the end. Do you plan to update the FLIP
> accordingly?
> Cheers,
> Till
> On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski <
> <>> wrote:
> Thanks for addressing this issue :)
> Best, Piotrek
> wt., 29 cze 2021 o 17:58 Hausmann, Steffen <<mailto:
>>> napisał(a):
> Hey Poitr,
> I've just adapted the FLIP and changed the signature for the
> `submitRequestEntries` method:
> protected abstract void submitRequestEntries(List<RequestEntryT>
> requestEntries, ResultFuture<?> requestResult);
> In addition, we are likely to use an AtomicLong to track the number of
> outstanding requests, as you have proposed in 2b). I've already indicated
> this in the FLIP, but it's not fully fleshed out. But as you have said,
> that seems to be an implementation detail and the important part is the
> change of the `submitRequestEntries` signature.
> Thanks for your feedback!
> Cheers, Steffen
> On 25.06.21, 17:05, "Hausmann, Steffen" <> wrote:
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>     Hi Piotr,
>     I’m happy to take your guidance on this. I need to think through your
> proposals and I’ll follow-up on Monday with some more context so that we
> can close the discussion on these details. But for now, I’ll close the vote.
>     Thanks, Steffen
>     From: Piotr Nowojski <<
> >>
>     Date: Friday, 25. June 2021 at 14:48
>     To: Till Rohrmann <<>>
>     Cc: Steffen Hausmann <<>>, "
><>" <
> <>>, Arvid Heise <<mailto:
>     Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>     Hey,
>     I've just synced with Arvid about a couple of more remarks from my
> side and he shared mine concerns.
>     1. I would very strongly recommend ditching `CompletableFuture<?> `
> from the  `protected abstract CompletableFuture<?>
> submitRequestEntries(List<RequestEntryT> requestEntries);`  in favor of
> something like
> `org.apache.flink.streaming.api.functions.async.ResultFuture` interface.
> `CompletableFuture<?>` would partially make the threading model of the
> `AsyncSincWriter` part of the public API and it would tie our hands.
> Regardless how `CompletableFuture<?>` is used, it imposes performance
> overhead because it's synchronisation/volatile inside of it. On the other
> hand something like:
>     protected abstract void submitRequestEntries(List<RequestEntryT>
> requestEntries, ResultFuture<?> requestResult);
>     Would allow us to implement the threading model as we wish.
> `ResultFuture` could be backed via `CompletableFuture<?>` underneath, but
> it could also be something more efficient.  I will explain what I have in
> mind in a second.
>     2. It looks to me that proposed `AsyncSinkWriter` Internals are not
> very efficient and maybe the threading model hasn't been thought through?
> Especially private fields:
>     private final BlockingDeque<RequestEntryT> bufferedRequestEntries;
>     private BlockingDeque<CompletableFuture<?>> inFlightRequests;
>     are a bit strange to me. Why do we need two separate thread safe
> collections? Why do we need a `BlockingDeque` of `CompletableFuture<?>`s?
> If we are already using a fully synchronised collection, there should be no
> need for another layer of thread safe `CompletableFuture<?>`.
>     As I understand, the threading model of the `AsyncSinkWriter` is very
> similar to that of the `AsyncWaitOperator`, with very similar requirements
> for inducing backpressure. How I would see it implemented is for example:
>     a) Having a single lock, that would encompass the whole
> `AsyncSinkWriter#flush()` method. `flush()` would be called from the task
> thread (mailbox). To induce backpressure, `#flush()` would just call
> `lock.wait()`. `ResultFuture#complete(...)` called from an async thread,
> would also synchronize on the same lock, and mark some of the inflight
> requests as completed and call `lock.notify()`.
>     b) More efficient solution. On the hot path we would have for example
> only `AtomicLong numberOfInFlightRequests`. Task thread would be bumping
> it, `ResultFuture#complete()` would be decreasing it. If the task thread
> when bumping `numberOfInFlightRequests` exceeds a threshold, he goes to
> sleep/wait on a lock or some `CompletableFuture`. If
> `ResultFuture#complete()` when decreasing the count goes below the
> threshold, it would wake up the task thread.  Compared to the option a),
> on the hot path, option b) would have only AtomicLong.increment overhead
>     c) We could use mailbox, the same way as AsyncWaitOperator is doing.
> In this case `ResultFuture#complete()` would be enquing mailbox action,
> which is thread safe on it's own.
>     Either of those options would be more efficient and simpler (from the
> threading model perspective) than having two `BlockingQueues` and
> `CompletableFuture<?>`. Also as you can see, neither of those solutions
> require the overhead of ` CompletableFuture<?>
> submitRequestEntries(List<RequestEntryT> requestEntries)`. Each one of
> those could use a more efficient and custom implementation of
> `ResultFuture.complete(...)`.
>     Whether we use a), b) or c) I think should be an implementation
> detail. But to allow this to truly be an implementation detail, we would
> need to agree on 1. Nevertheless I think that the change I proposed in 1.
> is small enough that I think there is no need to cancel the current vote on
> the FLIP.
>     WDYT?
>     Piotrek
>     wt., 22 cze 2021 o 11:42 Till Rohrmann <<mailto:
>>>> napisał(a):
>     Adding the InterruptException to the write method would make it
> explicit that the write call can block but must react to interruptions
> (e.g. when Flink wants to cancel the operation). I think this makes the
> contract a bit clearer.
>     I think starting simple and then extending the API as we see the need
> is a good idea.
>     Cheers,
>     Till
>     On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen <
> <><<mailto:
>>>> wrote:
>     Hey,
>     Agreed on starting with a blocking `write`. I've adapted the FLIP
> accordingly.
>     For now I've chosen to add the `InterruptedException` to the `write`
> method signature as I'm not fully understanding the implications of
> swallowing the exception. Depending on the details of  the code that is
> calling the write method, it may cause event loss. But this seems more of
> an implementation detail, that we can revisit once we are actually
> implementing the sink.
>     Unless there are additional comments, does it make sense to start the
> voting process in the next day or two?
>     Cheers, Steffen
>     On 21.06.21, 14:51, "Piotr Nowojski" <<mailto:
>>>> wrote:
>         CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>         Hi,
>         Thanks Steffen for the explanations. I think it makes sense to me.
>         Re Arvid/Steffen:
>         - Keep in mind that even if we choose to provide a non blocking
> API using
>         the `isAvailable()`/`getAvailableFuture()` method, we would still
> need to
>         support blocking inside the sinks. For example at the very least,
> emitting
>         many records at once (`flatMap`) or firing timers are scenarios
> when output
>         availability would be ignored at the moment by the runtime. Also I
> would
>         imagine writing very large (like 1GB) records would be blocking on
>         something as well.
>         - Secondly, exposing availability to the API level might not be
> that
>         easy/trivial. The availability pattern as defined in
> `AvailabilityProvider`
>         class is quite complicated and not that easy to implement by a
> user.
>         Both of those combined with lack of a clear motivation for adding
>         `AvailabilityProvider` to the sinks/operators/functions,  I would
> vote on
>         just starting with blocking `write` calls. This can always be
> extended in
>         the future with availability if needed/motivated properly.
>         That would be aligned with either Arvid's option 1 or 2. I don't
> know what
>         are the best practices with `InterruptedException`, but I'm always
> afraid
>         of it, so I would feel personally safer with option 2.
>         I'm not sure what problem option 3 is helping to solve? Adding
> `wakeUp()`
>         would sound strange to me.
>         Best,
>         Piotrek
>         pon., 21 cze 2021 o 12:15 Arvid Heise <<mailto:
> napisał(a):
>         > Hi Piotr,
>         >
>         > to pick up this discussion thread again:
>         > - This FLIP is about providing some base implementation for
> FLIP-143 sinks
>         > that make adding new implementations easier, similar to the
>         > SourceReaderBase.
>         > - The whole availability topic will most likely be a separate
> FLIP. The
>         > basic issue just popped up here because we currently have no way
> to signal
>         > backpressure in sinks except by blocking `write`. This feels
> quite natural
>         > in sinks with sync communication but quite unnatural in async
> sinks.
>         >
>         > Now we have a couple of options. In all cases, we would have
> some WIP
>         > limit on the number of records/requests being able to be
> processed in
>         > parallel asynchronously (similar to asyncIO).
>         > 1. We use some blocking queue in `write`, then we need to handle
>         > interruptions. In the easiest case, we extend `write` to throw
> the
>         > `InterruptedException`, which is a small API change.
>         > 2. We use a blocking queue, but handle interrupts and
> swallow/translate
>         > them. No API change.
>         > Both solutions block the task thread, so any RPC message /
> unaligned
>         > checkpoint would be processed only after the backpressure is
> temporarily
>         > lifted. That's similar to the discussions that you linked.
> Cancellation may
>         > also be a tad harder on 2.
>         > 3. We could also add some `wakeUp` to the `SinkWriter` similar to
>         > `SplitFetcher` [1]. Basically, you use a normal queue with a
> completeable
>         > future on which you block. Wakeup would be a clean way to
> complete it next
>         > to the natural completion through finished requests.
>         > 4. We add availability to the sink. However, this API change
> also requires
>         > that we allow operators to be available so it may be a bigger
> change with
>         > undesired side-effects. On the other hand, we could also use the
> same
>         > mechanism for asyncIO.
>         >
>         > For users of FLIP-171, none of the options are exposed. So we
> could also
>         > start with a simple solution (add `InterruptedException`) and
> later try to
>         > add availability. Option 1+2 would also not require an
> additional FLIP; we
>         > could add it as part of this FLIP.
>         >
>         > Best,
>         >
>         > Arvid
>         >
>         > [1]
>         >
>         > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
>         > <> wrote:
>         >
>         >> Hey Piotrek,
>         >>
>         >> Thanks for your comments on the FLIP. I'll address your second
> question
>         >> first, as I think it's more central to this FLIP. Just looking
> at the AWS
>         >> ecosystem, there are several sinks with overlapping
> functionality. I've
>         >> chosen AWS sinks here because I'm most familiar with those, but
> a similar
>         >> argument applies more generically for destination that support
> async ingest.
>         >>
>         >> There is, for instance, a sink for Amazon Kinesis Data Streams
> that is
>         >> part of Apache Flink [1], a sink for Amazon Kinesis Data
> Firehose [2], a
>         >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream
> [4]. All
>         >> these sinks have implemented their own mechanisms for batching,
> persisting,
>         >> and retrying events. And I'm not sure if all of them properly
> participate
>         >> in checkpointing. [3] even seems to closely mirror [1] as it
> contains
>         >> references to the Kinesis Producer Library, which is unrelated
> to Amazon
>         >> DynamoDB.
>         >>
>         >> These sinks predate FLIP-143. But as batching, persisting, and
> retrying
>         >> capabilities do not seem to be part of FLIP-143, I'd argue that
> we would
>         >> end up with similar duplication, even if these sinks were
> rewritten today
>         >> based on FLIP-143. And that's the idea of FLIP-171: abstract
> away these
>         >> commonly required capabilities so that it becomes easy to
> create support
>         >> for a wide range of destination without having to think about
> batching,
>         >> retries, checkpointing, etc. I've included an example in the
> FLIP [5] that
>         >> shows that it only takes a couple of lines of code to implement
> a sink with
>         >> exactly-once semantics. To be fair, the example is lacking
> robust failure
>         >> handling and some more advanced capabilities of [1], but I
> think it still
>         >> supports this point.
>         >>
>         >> Regarding your point on the isAvailable pattern. We need some
> way for the
>         >> sink to propagate backpressure and we would also like to
> support time based
>         >> buffering hints. There are two options I currently see and
> would need
>         >> additional input on which one is the better or more desirable
> one. The
>         >> first option is to use the non-blocking isAvailable pattern.
> Internally,
>         >> the sink persists buffered events in the snapshot state which
> avoids having
>         >> to flush buffered record on a checkpoint. This seems to align
> well with the
>         >> non-blocking isAvailable pattern. The second option is to make
> calls to
>         >> `write` blocking and leverage an internal thread to trigger
> flushes based
>         >> on time based buffering hints. We've discussed these options
> with Arvid and
>         >> suggested to assumed that the `isAvailable` pattern will become
> available
>         >> for sinks through and additional FLIP.
>         >>
>         >> I think it is an important discussion to have. My understanding
> of the
>         >> implications for Flink in general are very naïve, so I'd be
> happy to get
>         >> further guidance. However, I don't want to make this discussion
> part of
>         >> FLIP-171. For FLIP-171 we'll use whatever is available.
>         >>
>         >> Does that make sense?
>         >>
>         >> Cheers, Steffen
>         >>
>         >>
>         >> [1]
>         >>
>         >> [2]
>         >> [3]
>         >> [4]
>         >> [5]
>         >>
>         >>
>         >>
>         >> On 09.06.21, 19:44, "Piotr Nowojski" <
> <><<mailto:
>>>> wrote:
>         >>
>         >>     CAUTION: This email originated from outside of the
> organization. Do
>         >> not click links or open attachments unless you can confirm the
> sender and
>         >> know the content is safe.
>         >>
>         >>
>         >>
>         >>     Hi Steffen,
>         >>
>         >>     Thanks for writing down the proposal. Back when the new
> Sink API was
>         >> being
>         >>     discussed, I was proposing to add our usual
> `CompletableFuture<Void>
>         >>     isAvailable()` pattern to make sinks non-blocking. You can
> see the
>         >>     discussion starting here [1], and continuing for a couple
> of more
>         >> posts
>         >>     until here [2]. Back then, the outcome was that it would
> give very
>         >> little
>         >>     benefit, at the expense of making the API more complicated.
> Could you
>         >> maybe
>         >>     relate your proposal to that discussion from last year?
>         >>
>         >>     I see that your proposal is going much further than just
> adding the
>         >>     availability method, could you also motivate this a bit
> further?
>         >> Could you
>         >>     maybe reference/show some sinks that:
>         >>     1. are already implemented using FLIP-143
>         >>     2. that have some code duplication...
>         >>     3. ...this duplication would be solved by FLIP-171
>         >>
>         >>     Best,
>         >>     Piotrek
>         >>
>         >>     [1]
>         >>
>         >>
>         >>     [2]
>         >>
>         >>
>         >>
>         >>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen
> <>
>         >>     napisał(a):
>         >>
>         >>     > Hi there,
>         >>     >
>         >>     > We would like to start a discussion thread on "FLIP-171:
> Async
>         >> Sink" [1],
>         >>     > where we propose to create a common abstraction for
> destinations
>         >> that
>         >>     > support async requests. This abstraction will make it
> easier to add
>         >>     > destinations to Flink by implementing a lightweight shim,
> while it
>         >> avoids
>         >>     > maintaining dozens of independent sinks.
>         >>     >
>         >>     > Looking forward to your feedback.
>         >>     >
>         >>     > Cheers, Steffen
>         >>     >
>         >>     > [1]
>         >>     >
>         >>
>         >>     >
>         >>     >
>         >>     >
>         >>     > Amazon Web Services EMEA SARL
>         >>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>         >>     > Sitz der Gesellschaft: L-1855 Luxemburg
>         >>     > eingetragen im Luxemburgischen Handelsregister unter
> R.C.S. B186284
>         >>     >
>         >>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>         >>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>         >>     > Sitz der Zweigniederlassung: Muenchen
>         >>     > eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter HRB
>         >> 242240,
>         >>     > USt-ID DE317013094
>         >>     >
>         >>     >
>         >>     >
>         >>     >
>         >>
>         >>
>         >>
>         >>
>         >> Amazon Web Services EMEA SARL
>         >> 38 avenue John F. Kennedy, L-1855 Luxembourg
>         >> Sitz der Gesellschaft: L-1855 Luxemburg
>         >> eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>         >>
>         >> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>         >> Marcel-Breuer-Str. 12, D-80807 Muenchen
>         >> Sitz der Zweigniederlassung: Muenchen
>         >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter
>         >> 242240, USt-ID DE317013094
>         >>
>         >>
>         >>
>         >>
>     Amazon Web Services EMEA SARL
>     38 avenue John F. Kennedy, L-1855 Luxembourg
>     Sitz der Gesellschaft: L-1855 Luxemburg
>     eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>     Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     Marcel-Breuer-Str. 12, D-80807 Muenchen
>     Sitz der Zweigniederlassung: Muenchen
>     eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240, USt-ID DE317013094
>     Amazon Web Services EMEA SARL
>     38 avenue John F. Kennedy, L-1855 Luxembourg
>     Sitz der Gesellschaft: L-1855 Luxemburg
>     eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>     Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     Marcel-Breuer-Str. 12, D-80807 Muenchen
>     Sitz der Zweigniederlassung: Muenchen
>     eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240, USt-ID DE317013094
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094

Reply via email to