If that's the case, I think framework should not commit if there are any
outstanding records in teh reporter. That would prevent the scenario where
we could potentially lose records frm being sent either to Sink/the
reporter. WDYT about the KIP including that as part of the design?

On Sun, May 17, 2020 at 11:13 AM Randall Hauch <[email protected]> wrote:

> On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> [email protected]> wrote:
>
> > Randall
> >
> > Thanks a lot for your thoughts. I was wondering if we would ever have to
> > make the API asynchronous, we could expose it as a new method right? If
> > that's a possibility would it be better if the API explicitly has
> semantics
> > of a synchronous API if the implementation is indeed going to be
> > synchronous.
> >
>
> Thanks, Magesh.
>
> I think it's likely that the implementation may need to be synchronous to
> some degree. For example, just to keep the implementation simple we might
> block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we
> might latch until the reporter has received all acks, especially if it
> simplifies the offset management and commit logic.
>
> Even if that's the case, having each `report(...)` call be asynchronous
> means that the sink task doesn't *have* to wait until each failed record
> has been recorded to continue sending valid records to the external system.
> Consider an example with 1000 records in a batch, where only the first
> record has an error. If `record(...)` were synchronous, the `put(...)`
> method would block reporting the first record and would then only send the
> 999 after that's happened. With an asynchronous `record(...)` method, the
> `put(...)` method could report the first record, send the 999 records, and
> then wait for the futures returned by the report method.
>
>
> >
> > On Sun, May 17, 2020, 9:27 AM Randall Hauch <[email protected]> wrote:
> >
> > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > [email protected]> wrote:
> > >
> > > > Thanks Randall. The suggestion i made also has a problem when
> reporter
> > > > isn't enabled where it could potentially write records after error
> > > records
> > > > to sink before failing.
> > > >
> > > > The other concern i had with reporter being asynchronous. For some
> > reason
> > > > if the reporter is taking longer because of say a specific broker
> > issue,
> > > > the connector might still move forward and commit if it's not waiting
> > for
> > > > the reporter.  During  this if the worker crashes we will now lose
> the
> > > bad
> > > > record
> > > >  I don't think this is desirable behavior. I think the synchronous
> > > reporter
> > > > provides better guarantees for all connectors.
> > > >
> > > >
> > > Thanks, Magesh.
> > >
> > > That's a valid concern, and maybe that will affect how the feature is
> > > actually implemented. I expect it to be a bit tricky to ensure that
> > errant
> > > records are fully written to Kafka before the offsets are committed, so
> > it
> > > might be simplest to start out with a synchronous implementation. But
> the
> > > API can still be an asynchronous design whether or not the
> implementation
> > > is synchronous. That gives us the ability in the future to change the
> > > implementation if we determine a way to handle all concerns. For
> example,
> > > the WorkerSinkTask may need to backoff if waiting to commit due to too
> > many
> > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > `report`
> > > method(s) synchronous from the beginning, it will be very challenging
> to
> > > change them in the future to be asynchronous.
> > >
> > > I guess it boils down to this question: do we know today that we will
> > > *never* want the reporter to write asynchronously?
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> >
>


-- 
Thanks
Magesh

*Magesh Nandakumar*
Software Engineer
[email protected]

Reply via email to