Thanks for the detailed explanation Randall.

I think it highlights nicely how the common practice of overlapping
communication with computation (or other communication) in concurrent
systems can be useful and practical in this case.

I also agree with the amendment around `preCommit` and the guarantee that
failed records will have been reported before their respective offsets get
the chance to be committed. Aakash, with the addition of this final note in
the current KIP, from my point of view all the main objectives that were
surfaced in this thread have been fulfilled.

Best,
Konstantine


On Mon, May 18, 2020 at 4:24 PM Randall Hauch <rha...@gmail.com> wrote:

> Hi, Chris, Aakash, and others:
>
> First of all, apologies for the extremely long email. Secondly, thanks for
> the input on this KIP. The timing is unfortunately, but I do believe we're
> agreed on most points.
>
> Chris asked earlier:
>
> > I'm still unclear on how futures are going to provide any benefit to
> > developers, though.
>
>
> Let me rephrase this, because I think it's obvious that futures are just a
> way for the sink task to know, if needed, *when* the reporter has
> successfully recorded the report of a bad record:
>
> What is the benefit of an asynchronous `report(...)`, and is it worth the
> additional complexity?
>
>
> I would agree that this complexity is not worthwhile *if* every sink task
> implementation were expected to call `get()` on the future right after it's
> called. But I hope to show that the asynchronous API as described by
> KIP-610 (as of the time I sent this email) is valuable, simple to use,
> flexible, and not onerous for many sink task implementations, because many
> sink task implementations will not *have* to use the future -- as long as
> we add one *additional* guarantee to the current KIP (which I'll define
> later on).
>
> To show this, I'd like to walk through an example of what happens when a
> sink task runs. I'm going to describe a scenario that involves multiple
> `put(...)` followed by a single commit of offsets, since this is the
> pattern the WorkerSinkTask uses, and because the "commit" part is essential
> to the guarantee I believe we need to make. Specifically, consider one task
> for a sink connector consuming a single topic partition, and the calls made
> by WorkerSinkTask:
>
> 1. put(...) with records with offsets 1-1000
> 2. put(...) with records with offsets 1001-2000
> 3. put(...) with records with offsets 2001-3000
> 4. put(...) with records with offsets 3001-4000
> 5. preCommit(...) with offset 4000
>
> Now, let's say that the sink task calls `report(...)` on six records at
> offsets 10, 11, 1010, 1011, 2010, and 2011, and let's look at what happens
> when `report(...)` is synchronous and asynchronous. For simplicity, we're
> going to assume that records are sent to the external system in batches of
> 100.
>
> Let's consider the synchronous `report(...)` case first. The task basically
> goes through the following sequence:
> a) processes records 1-9
> b) report record 10, blocking until record 10 is written to the DLQ topic
> c) report record 11, blocking until record 11 is written to the DLQ topic
> d) process records 12-102
> e) send batch of 100 records (offsets 1-102, minus 10 and 11) to external
> system
> f) process records 103-202
> g) send batch of 100 records (offsets 203-202) to external system
> ...
> s) send batch of 100 records (offsets 3007-4000) to external system
> t) respond to preCommit(4000) by returning the same offset
>
> Note that steps b, c, and similar steps for the other four problematic
> records must wait until the one record is written to the DLQ before
> continuing. In this case, this means that the first batch of 100 records is
> sent to the external system only after the two records have been
> successfully written to the DLQ. The *synchronous* `report(...)` method
> results in an increased lag in the records appearing in the external system
> relative to when they written to the topic consumed by the sink task. In
> fact, every record passed to `report(...)` will cause some amount of
> additional lag in delivering records to the external system. The lag is
> even higher as the `report(...)` method takes longer, such as if/when the
> DLQ producer is slower (e.g., due to network issues, retries, etc.).
>
> Now let's look at the case when `report(...)` is asynchronous, and the task
> does not call `get()` on the future. Now, the task basically goes through
> the following sequence:
> a) processes records 1-9
> b) report record 10 and do NOT block until record 10 is written to the DLQ
> topic
> c) report record 11 and do NOT block until record 10 is written to the DLQ
> topic
> d) process records 12-102
> e) send batch of 100 records (offsets 1-102, minus 10 and 11) to external
> system
> f) process records 103-202
> g) send batch of 100 records (offsets 203-202) to external system
> ...
> s) send batch of 100 records (offsets 3007-4000) to external system
> t) respond to preCommit(4000) by returning the same offset
>
> Note that steps b, c, and similar steps for the other four problematic
> records do NOT wait until records are written to the DLQ before continuing.
> This means that the first batch of 100 records is sent to the external
> system whether or not the two records have been successfully written to the
> DLQ. The *asynchronous* `report(...)` method allows the sink task to
> continue processing subsequent records without delay, and the net lag is
> lower than the synchronous case. Plus, minor to moderate delays in
> reporting do not necessarily impact the sink task operation.
>
> This is an example of a sink task choosing to not block on the future
> returned from `report(...)`. Of course, sink tasks *can* use the future if
> they desire -- for example, maybe the sink task does block on the future
> before returning from each `put(...)` simply because of the guarantees it
> wants to provide. Maybe the sink task is managing offsets, and it wants to
> write offsets in the external system only after all error records in some
> sequence are fully processed.
>
> The bottom line is that an asynchronous `report(...)` method has
> significant advantages, is still easy to use, and when necessary allows
> sink task implementations to track when the errors have been "fully
> reported", all while not constraining/limiting sink task implementations
> that don't need those guarantees.
>
> However, here's the additional concern I mentioned at the outset of my
> email. Connect should not commit offsets for a topic partition only after
> the error reporter has "fully processed" all submitted records with that or
> earlier offsets. For the sink task developer, this means the framework must
> guarantee this happens before `preCommit(...)` is called. I think we fully
> describe this guarantee by adding the following to the KIP (perhaps in a
> new "Guarantees" section):
>
> "The Connect framework also guarantees that by the time `preCommit(...)` is
> called on the task, the error reporter will have successfully and fully
> recorded all reported records with offsets at or before those passed to the
> preCommit method. Sink task implementations that need more strict
> guarantees can use the futures returned by `report(...)` to wait for
> confirmation that reported records have been successfully recorded."
>
>
> IMO this gives sink task developers pretty clear guidance: sink tasks need
> only worry about the futures returned from `report(...)` if they require
> more strict guarantees that the errors have been fully reported. This still
> allows sink tasks to return different offsets from `preCommit(...)`. And
> any sink connectors that rely upon the framework committing consumer
> offsets based upon when records were fully-processed by the task will
> likely not have to use futures at all.
>
> To put in terms I used at the outset of this (way too long) email: the
> asynchronous API
>
>    - is valuable because it allows sink tasks to continue doing work
>    without having to wait for the reporter, and that only during the
> "commit"
>    phase do we need to potentially wait for the reporter
>    - is simple because for many sink tasks they only need to call
>    `report(...)` and will not need to even worry about the future;
>    - is flexible because any task that needs stricter guarantees can use
>    the future to block on the reporter, including at some later point in
> time
>    after the `report(...)` method is called; and
>    - is not onerous because using the future is a common pattern and simple
>    blocking, if needed, is trivial.
>
> It is true that when the last record on the last put before a commit is
> reported as an error, the framework may have to wait. But this is no worse
> and actually likely better than making the `report(...)` method
> synchronous.
>
> I hope this helps.
>
> Best regards,
>
> Randall
>
> On Mon, May 18, 2020 at 4:44 PM Aakash Shah <as...@confluent.io> wrote:
>
> > Hi Chris,
> >
> > I agree with your point.
> >
> > Randall, Konstantine, do you guys mind weighing in on any benefit of
> adding
> > asynchronous functionality using a Future in the KIP right now? It seems
> to
> > me that it only provides user control on when the thread will be blocked,
> > and if we are going to process all the futures at once in a batch at the
> > end, why not support batch processing in a future KIP, since it is not
> too
> > difficult now that we are adding an interface. I'm not sure I see any
> gain
> > beyond some user control that could increase throughput - but at the same
> > time, as I mentioned before, I don't think throughput is a factor we need
> > to consider much with error reporting. We don't really need or
> necessarily
> > want a higher throughput on error reporting, as ideally, there should not
> > be a high volume of errant records.
> >
> > Thanks,
> > Aakash
> >
> > On Mon, May 18, 2020 at 1:22 PM Chris Egerton <chr...@confluent.io>
> wrote:
> >
> > > Hi Aakash,
> > >
> > > Yep, that's pretty much it. I'd also like to emphasize that we should
> be
> > > identifying practical use cases for whatever API we provide. Giving
> > > developers a future that can be made synchronous with little effort
> seems
> > > flexible, but if that's all that developers are going to do with it
> > anyway,
> > > why make it a future at all? We should have some idea of how people
> would
> > > use a future that doesn't just hinge on them blocking on it
> immediately,
> > > and isn't more easily-addressed by a different API (such as one with
> > batch
> > > reporting).
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, May 18, 2020 at 1:17 PM Aakash Shah <as...@confluent.io>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Chris, I see your points about whether Futures provide much benefit
> at
> > > all
> > > > as they are not truly fully asynchronous.
> > > >
> > > > Correct me if I am wrong, but I think what you are trying to point
> out
> > is
> > > > that if we have the option to add additional functionality later (in
> a
> > > > simpler way too since we are introducing a new interface), we should
> > > > provide functionality that we know will provide value immediately and
> > not
> > > > cause any developer/user burden.
> > > >
> > > > In that case, I think the main area we have to come to a consensus on
> > is
> > > -
> > > > how much control do we want to provide to the developer/user in this
> > KIP
> > > > considering that we can add the functionality relatively easily
> later?
> > > >
> > > > Randall, Konstantine, what do you think about adding it later vs now?
> > > >
> > > > Thanks,
> > > > Aakash
> > > >
> > > > On Mon, May 18, 2020 at 12:45 PM Chris Egerton <chr...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Aakash,
> > > > >
> > > > > I asked this earlier about whether futures were the right way to
> go,
> > if
> > > > we
> > > > > wanted to enable asynchronous behavior at all:
> > > > >
> > > > > > I'm still unclear on how futures are going to provide any benefit
> > to
> > > > > developers, though. Blocking on the return of such a future
> slightly
> > > > later
> > > > > on in the process of handling records is still blocking, and to be
> > done
> > > > > truly asynchronously without blocking processing of non-errant
> > records,
> > > > > would have to be done on a separate thread. It's technically
> possible
> > > for
> > > > > users to cache all of these futures and instead of invoking "get"
> on
> > > > them,
> > > > > simply check whether they're complete or not via "isDone", but this
> > > seems
> > > > > like an anti-pattern.
> > > > >
> > > > > > What is the benefit of wrapping this in a future?
> > > > >
> > > > > As far as I can tell, there hasn't been a practical example yet
> where
> > > the
> > > > > flexibility provided by a future would actually be beneficial in
> > > writing
> > > > a
> > > > > connector. It'd be great if we could find one. One possible use
> case
> > > > might
> > > > > be processing records received in "SinkTask::put" without having to
> > > block
> > > > > for each errant record report before sending non-errant records to
> > the
> > > > > sink. However, this could also be addressed by allowing for batch
> > > > reporting
> > > > > of errant records instead of accepting a single record at a time;
> the
> > > > task
> > > > > would track errant records as it processes them in "put" and report
> > > them
> > > > > all en-masse after all non-errant records have been processed.
> > > > >
> > > > > With regards to the precedent of using futures for asynchronous
> > APIs, I
> > > > > think we should make sure that whatever API we decide on is
> actually
> > > > useful
> > > > > for the cases it serves. There's plenty of precedent for
> > callback-based
> > > > > asynchronous APIs in Kafka with both "Producer::send" and
> > > > > "Consumer::commitAsync"; the question here shouldn't be about
> what's
> > > done
> > > > > in different APIs, but what would work for this one in particular.
> > > > >
> > > > > Finally, it's also been brought up that if we're going to
> introduce a
> > > new
> > > > > error reporter interface, we can always modify that interface later
> > on
> > > to
> > > > > go from asynchronous to synchronous behavior, or vice-versa, or
> even
> > to
> > > > add
> > > > > a callback- or future-based variant that didn't exist before. We
> have
> > > > > plenty of room to maneuver in the future here, so the pressure to
> get
> > > > > everything right the first time and provide maximum flexibility
> > doesn't
> > > > > seem as pressing, and the goal of minimizing the kind of API that
> we
> > > have
> > > > > to support for future versions without making unnecessary additions
> > is
> > > > > easier to achieve.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > >
> > > > >
> > > > > On Mon, May 18, 2020 at 12:20 PM Aakash Shah <as...@confluent.io>
> > > wrote:
> > > > >
> > > > > > Hi Arjun,
> > > > > >
> > > > > > Thanks for your feedback.
> > > > > >
> > > > > > I agree with moving to Future<Void>, those are good points.
> > > > > >
> > > > > > I believe an earlier point made for asynchronous functionality
> were
> > > > that
> > > > > > modern APIs tend to be asynchronous as they result in more
> > expressive
> > > > and
> > > > > > better defined APIs.
> > > > > > Additionally, because a lot of Kafka Connect functionality is
> > already
> > > > > > asynchronous, I am inclined to believe that customers will want
> an
> > > > > > asynchronous solution for this as well. And if is relatively
> simple
> > > to
> > > > > > block with future.get() to make it synchronous, would you not say
> > > that
> > > > > > having an opt-in synchronous functionality rather than
> synchronous
> > > only
> > > > > > functionality allows for customer control while maintaining that
> > not
> > > > too
> > > > > > much burden of implementation is placed on the customer?
> > > > > > WDYT?
> > > > > >
> > > > > > Thanks,
> > > > > > Aakash
> > > > > >
> > > > > > On Sun, May 17, 2020 at 2:51 PM Arjun Satish <
> > arjun.sat...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for all the feedback, folks.
> > > > > > >
> > > > > > > re: having a callback as a parameter, I agree that at this
> point,
> > > it
> > > > > > might
> > > > > > > not add much value to the proposal.
> > > > > > >
> > > > > > > re: synchronous vs asynchronous, is the motivation
> > > performance/higher
> > > > > > > throughput? Taking a step back, calling report(..) in the new
> > > > interface
> > > > > > > does a couple of things:
> > > > > > >
> > > > > > > 1. at a fundamental level, it is a signal to the framework
> that a
> > > > > failure
> > > > > > > occurred when processing records, specifically due to the given
> > > > record.
> > > > > > > 2. depending on whether errors.log and errors.deadletterqueue
> has
> > > > been
> > > > > > set,
> > > > > > > some messages are written to zero or more destinations.
> > > > > > > 3. depending on the value of errors.tolerance (none or all),
> the
> > > task
> > > > > is
> > > > > > > failed after reporters have completed.
> > > > > > >
> > > > > > > for kip-610, the asynchronous method has the advantage of
> working
> > > > with
> > > > > > the
> > > > > > > internal dead letter queue (which has been transparent to the
> > > > developer
> > > > > > so
> > > > > > > far). but, how does async method help if the DLQ is not
> enabled?
> > in
> > > > > this
> > > > > > > case RecordMetadata is not very useful, AFAICT? also, if we add
> > > more
> > > > > > error
> > > > > > > reporters in the future (say, for example, a new reporter in a
> > > future
> > > > > > that
> > > > > > > writes to a RDBMS), would the async version return success on
> all
> > > or
> > > > > > > nothing, and what about partial successes?
> > > > > > >
> > > > > > > overall, if we really need async behavior, I'd prefer to just
> use
> > > > > > > Future<Void>. but if we can keep it simple, then let's go with
> a
> > > > > > > synchronous function with the parameters Randall proposed above
> > > (with
> > > > > > > return type as void, and if any of the reporters fail, the task
> > is
> > > > > failed
> > > > > > > if error.tolerance is none, and kept alive if tolerance is
> all),
> > > and
> > > > > > maybe
> > > > > > > add asynchronous methods in a future KIP?
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to