Hi all,

I'm on board with adding an interface in the Connect API as Arjun
suggested. Slightly higher commitment and maintenance but it also gives us
an easier path to future extensions in this scope (error handling). The
usage is equivalent to adding just a new method with known types to
`SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
in the connector code, but in both suggestions this would fail with
`NoSuchMethodError` on older workers).

With respect to the method signature, I also agree with Randall's latest
suggestion, of a two argument method such as:

Future<Void> report(SinkTask, Throwable)

Returning `Future<RecordMetadata>` can also be ok, but since this refers to
the DLQ I'd slightly prefer to avoid exposing information that might
confuse the users regarding what topic, partitions and offset this return
value corresponds to. But both return types should be fine and will give
plenty of flexibility to connector developers, making the sync use case
straightforward. In any case, given the interface we can extend this in a
compatible way in the future if we think we need to.

Minor comments:
Version will be 2.6 and not 2.9 (the latter was added by accident in a few
places).

Best,
Konstantine


On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> If that's the case, I think framework should not commit if there are any
> outstanding records in teh reporter. That would prevent the scenario where
> we could potentially lose records frm being sent either to Sink/the
> reporter. WDYT about the KIP including that as part of the design?
>
> On Sun, May 17, 2020 at 11:13 AM Randall Hauch <rha...@gmail.com> wrote:
>
> > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > mage...@confluent.io> wrote:
> >
> > > Randall
> > >
> > > Thanks a lot for your thoughts. I was wondering if we would ever have
> to
> > > make the API asynchronous, we could expose it as a new method right? If
> > > that's a possibility would it be better if the API explicitly has
> > semantics
> > > of a synchronous API if the implementation is indeed going to be
> > > synchronous.
> > >
> >
> > Thanks, Magesh.
> >
> > I think it's likely that the implementation may need to be synchronous to
> > some degree. For example, just to keep the implementation simple we might
> > block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we
> > might latch until the reporter has received all acks, especially if it
> > simplifies the offset management and commit logic.
> >
> > Even if that's the case, having each `report(...)` call be asynchronous
> > means that the sink task doesn't *have* to wait until each failed record
> > has been recorded to continue sending valid records to the external
> system.
> > Consider an example with 1000 records in a batch, where only the first
> > record has an error. If `record(...)` were synchronous, the `put(...)`
> > method would block reporting the first record and would then only send
> the
> > 999 after that's happened. With an asynchronous `record(...)` method, the
> > `put(...)` method could report the first record, send the 999 records,
> and
> > then wait for the futures returned by the report method.
> >
> >
> > >
> > > On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com> wrote:
> > >
> > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > mage...@confluent.io> wrote:
> > > >
> > > > > Thanks Randall. The suggestion i made also has a problem when
> > reporter
> > > > > isn't enabled where it could potentially write records after error
> > > > records
> > > > > to sink before failing.
> > > > >
> > > > > The other concern i had with reporter being asynchronous. For some
> > > reason
> > > > > if the reporter is taking longer because of say a specific broker
> > > issue,
> > > > > the connector might still move forward and commit if it's not
> waiting
> > > for
> > > > > the reporter.  During  this if the worker crashes we will now lose
> > the
> > > > bad
> > > > > record
> > > > >  I don't think this is desirable behavior. I think the synchronous
> > > > reporter
> > > > > provides better guarantees for all connectors.
> > > > >
> > > > >
> > > > Thanks, Magesh.
> > > >
> > > > That's a valid concern, and maybe that will affect how the feature is
> > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > errant
> > > > records are fully written to Kafka before the offsets are committed,
> so
> > > it
> > > > might be simplest to start out with a synchronous implementation. But
> > the
> > > > API can still be an asynchronous design whether or not the
> > implementation
> > > > is synchronous. That gives us the ability in the future to change the
> > > > implementation if we determine a way to handle all concerns. For
> > example,
> > > > the WorkerSinkTask may need to backoff if waiting to commit due to
> too
> > > many
> > > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > > `report`
> > > > method(s) synchronous from the beginning, it will be very challenging
> > to
> > > > change them in the future to be asynchronous.
> > > >
> > > > I guess it boils down to this question: do we know today that we will
> > > > *never* want the reporter to write asynchronously?
> > > >
> > > > Best regards,
> > > >
> > > > Randall
> > > >
> > >
> >
>
>
> --
> Thanks
> Magesh
>
> *Magesh Nandakumar*
> Software Engineer
> mage...@confluent.io
>

Reply via email to