Hey,

Agreed on starting with a blocking `write`. I've adapted the FLIP accordingly.

For now I've chosen to add the `InterruptedException` to the `write` method 
signature as I'm not fully understanding the implications of swallowing the 
exception. Depending on the details of  the code that is calling the write 
method, it may cause event loss. But this seems more of an implementation 
detail, that we can revisit once we are actually implementing the sink.

Unless there are additional comments, does it make sense to start the voting 
process in the next day or two?

Cheers, Steffen


On 21.06.21, 14:51, "Piotr Nowojski" <pnowoj...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi,

    Thanks Steffen for the explanations. I think it makes sense to me.

    Re Arvid/Steffen:

    - Keep in mind that even if we choose to provide a non blocking API using
    the `isAvailable()`/`getAvailableFuture()` method, we would still need to
    support blocking inside the sinks. For example at the very least, emitting
    many records at once (`flatMap`) or firing timers are scenarios when output
    availability would be ignored at the moment by the runtime. Also I would
    imagine writing very large (like 1GB) records would be blocking on
    something as well.
    - Secondly, exposing availability to the API level might not be that
    easy/trivial. The availability pattern as defined in `AvailabilityProvider`
    class is quite complicated and not that easy to implement by a user.

    Both of those combined with lack of a clear motivation for adding
    `AvailabilityProvider` to the sinks/operators/functions,  I would vote on
    just starting with blocking `write` calls. This can always be extended in
    the future with availability if needed/motivated properly.

    That would be aligned with either Arvid's option 1 or 2. I don't know what
    are the best practices with `InterruptedException`, but I'm always afraid
    of it, so I would feel personally safer with option 2.

    I'm not sure what problem option 3 is helping to solve? Adding `wakeUp()`
    would sound strange to me.

    Best,
    Piotrek

    pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org> napisał(a):

    > Hi Piotr,
    >
    > to pick up this discussion thread again:
    > - This FLIP is about providing some base implementation for FLIP-143 sinks
    > that make adding new implementations easier, similar to the
    > SourceReaderBase.
    > - The whole availability topic will most likely be a separate FLIP. The
    > basic issue just popped up here because we currently have no way to signal
    > backpressure in sinks except by blocking `write`. This feels quite natural
    > in sinks with sync communication but quite unnatural in async sinks.
    >
    > Now we have a couple of options. In all cases, we would have some WIP
    > limit on the number of records/requests being able to be processed in
    > parallel asynchronously (similar to asyncIO).
    > 1. We use some blocking queue in `write`, then we need to handle
    > interruptions. In the easiest case, we extend `write` to throw the
    > `InterruptedException`, which is a small API change.
    > 2. We use a blocking queue, but handle interrupts and swallow/translate
    > them. No API change.
    > Both solutions block the task thread, so any RPC message / unaligned
    > checkpoint would be processed only after the backpressure is temporarily
    > lifted. That's similar to the discussions that you linked. Cancellation 
may
    > also be a tad harder on 2.
    > 3. We could also add some `wakeUp` to the `SinkWriter` similar to
    > `SplitFetcher` [1]. Basically, you use a normal queue with a completeable
    > future on which you block. Wakeup would be a clean way to complete it next
    > to the natural completion through finished requests.
    > 4. We add availability to the sink. However, this API change also requires
    > that we allow operators to be available so it may be a bigger change with
    > undesired side-effects. On the other hand, we could also use the same
    > mechanism for asyncIO.
    >
    > For users of FLIP-171, none of the options are exposed. So we could also
    > start with a simple solution (add `InterruptedException`) and later try to
    > add availability. Option 1+2 would also not require an additional FLIP; we
    > could add it as part of this FLIP.
    >
    > Best,
    >
    > Arvid
    >
    > [1]
    > 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
    > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
    > <shau...@amazon.de.invalid> wrote:
    >
    >> Hey Piotrek,
    >>
    >> Thanks for your comments on the FLIP. I'll address your second question
    >> first, as I think it's more central to this FLIP. Just looking at the AWS
    >> ecosystem, there are several sinks with overlapping functionality. I've
    >> chosen AWS sinks here because I'm most familiar with those, but a similar
    >> argument applies more generically for destination that support async 
ingest.
    >>
    >> There is, for instance, a sink for Amazon Kinesis Data Streams that is
    >> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a
    >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All
    >> these sinks have implemented their own mechanisms for batching, 
persisting,
    >> and retrying events. And I'm not sure if all of them properly participate
    >> in checkpointing. [3] even seems to closely mirror [1] as it contains
    >> references to the Kinesis Producer Library, which is unrelated to Amazon
    >> DynamoDB.
    >>
    >> These sinks predate FLIP-143. But as batching, persisting, and retrying
    >> capabilities do not seem to be part of FLIP-143, I'd argue that we would
    >> end up with similar duplication, even if these sinks were rewritten today
    >> based on FLIP-143. And that's the idea of FLIP-171: abstract away these
    >> commonly required capabilities so that it becomes easy to create support
    >> for a wide range of destination without having to think about batching,
    >> retries, checkpointing, etc. I've included an example in the FLIP [5] 
that
    >> shows that it only takes a couple of lines of code to implement a sink 
with
    >> exactly-once semantics. To be fair, the example is lacking robust failure
    >> handling and some more advanced capabilities of [1], but I think it still
    >> supports this point.
    >>
    >> Regarding your point on the isAvailable pattern. We need some way for the
    >> sink to propagate backpressure and we would also like to support time 
based
    >> buffering hints. There are two options I currently see and would need
    >> additional input on which one is the better or more desirable one. The
    >> first option is to use the non-blocking isAvailable pattern. Internally,
    >> the sink persists buffered events in the snapshot state which avoids 
having
    >> to flush buffered record on a checkpoint. This seems to align well with 
the
    >> non-blocking isAvailable pattern. The second option is to make calls to
    >> `write` blocking and leverage an internal thread to trigger flushes based
    >> on time based buffering hints. We've discussed these options with Arvid 
and
    >> suggested to assumed that the `isAvailable` pattern will become available
    >> for sinks through and additional FLIP.
    >>
    >> I think it is an important discussion to have. My understanding of the
    >> implications for Flink in general are very naïve, so I'd be happy to get
    >> further guidance. However, I don't want to make this discussion part of
    >> FLIP-171. For FLIP-171 we'll use whatever is available.
    >>
    >> Does that make sense?
    >>
    >> Cheers, Steffen
    >>
    >>
    >> [1]
    >> 
https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
    >> [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
    >> [3] https://github.com/klarna-incubator/flink-connector-dynamodb
    >> [4] https://github.com/awslabs/amazon-timestream-tools/
    >> [5]
    >> 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
    >>
    >>
    >> On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
    >>
    >>     CAUTION: This email originated from outside of the organization. Do
    >> not click links or open attachments unless you can confirm the sender and
    >> know the content is safe.
    >>
    >>
    >>
    >>     Hi Steffen,
    >>
    >>     Thanks for writing down the proposal. Back when the new Sink API was
    >> being
    >>     discussed, I was proposing to add our usual `CompletableFuture<Void>
    >>     isAvailable()` pattern to make sinks non-blocking. You can see the
    >>     discussion starting here [1], and continuing for a couple of more
    >> posts
    >>     until here [2]. Back then, the outcome was that it would give very
    >> little
    >>     benefit, at the expense of making the API more complicated. Could you
    >> maybe
    >>     relate your proposal to that discussion from last year?
    >>
    >>     I see that your proposal is going much further than just adding the
    >>     availability method, could you also motivate this a bit further?
    >> Could you
    >>     maybe reference/show some sinks that:
    >>     1. are already implemented using FLIP-143
    >>     2. that have some code duplication...
    >>     3. ...this duplication would be solved by FLIP-171
    >>
    >>     Best,
    >>     Piotrek
    >>
    >>     [1]
    >>
    >> 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
    >>     [2]
    >>
    >> 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
    >>
    >>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen <shau...@amazon.de.invalid>
    >>     napisał(a):
    >>
    >>     > Hi there,
    >>     >
    >>     > We would like to start a discussion thread on "FLIP-171: Async
    >> Sink" [1],
    >>     > where we propose to create a common abstraction for destinations
    >> that
    >>     > support async requests. This abstraction will make it easier to add
    >>     > destinations to Flink by implementing a lightweight shim, while it
    >> avoids
    >>     > maintaining dozens of independent sinks.
    >>     >
    >>     > Looking forward to your feedback.
    >>     >
    >>     > Cheers, Steffen
    >>     >
    >>     > [1]
    >>     >
    >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >>     >
    >>     >
    >>     >
    >>     > Amazon Web Services EMEA SARL
    >>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
    >>     > Sitz der Gesellschaft: L-1855 Luxemburg
    >>     > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >>     >
    >>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
    >>     > Sitz der Zweigniederlassung: Muenchen
    >>     > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
    >> 242240,
    >>     > USt-ID DE317013094
    >>     >
    >>     >
    >>     >
    >>     >
    >>
    >>
    >>
    >>
    >> Amazon Web Services EMEA SARL
    >> 38 avenue John F. Kennedy, L-1855 Luxembourg
    >> Sitz der Gesellschaft: L-1855 Luxemburg
    >> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >>
    >> Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >> Marcel-Breuer-Str. 12, D-80807 Muenchen
    >> Sitz der Zweigniederlassung: Muenchen
    >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
    >> 242240, USt-ID DE317013094
    >>
    >>
    >>
    >>




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, 
USt-ID DE317013094



Reply via email to