Hi all,

I have more comments I'd like to make on this KIP when I have time (sorry
for the delay, Yash, and thanks for your patience!), but I did want to
chime in and say that I'm also not sure about overloading SinkTask::put. I
share the concerns about creating an intuitive, simple API that Yash has
raised. In addition, this approach doesn't seem very sustainable--what do
we do if we encounter another case in the future that would warrant a
similar solution? We probably don't want to create three, four, etc.
overloaded variants of the method, each of which would have to be
implemented by connector developers who want to both leverage the latest
and greatest connector APIs and maintain compatibility with connect
Clusters running older versions.

I haven't been able to flesh this out into a design worth publishing in its
own KIP yet, but one alternative I've pitched to a few people with
generally positive interest has been to develop an official compatibility
library for Connect developers. This library would be released as its own
Maven artifact (separate from connect-api, connect-runtime, etc.) and would
provide a simple, clean interface for developers to determine which
features are supported by the version of the Connect runtime that their
plugin has been deployed onto. Under the hood, this library might use
reflection to determine whether classes, methods, etc. are available, but
the developer wouldn't have to do anything more than check (for example)
`Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know at any point
in the lifetime of their connector/task whether that feature is provided by
the runtime.

One other high-level comment: this doesn't address every case, but we might
consider adding an API to "ack" sink records. This could use the
SubmittedRecords class [1] (with some slight tweaks) under the hood to
track the latest-acked offset for each topic partition. This way, connector
developers won't be responsible for tracking offsets at all in their sink
tasks (eliminating issues with the accuracy of post-transformation T/P/O
sink record information), and they'll only have to notify the Connect
framework when a record has been successfully dispatched to the external
system. This provides a cleaner, friendlier API, and also enables more
fine-grained metrics like the ones proposed in KIP-767 [2].

[1] -
https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
[2] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics

Cheers,

Chris

On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Randall,
>
> It's been a while for this one but the more I think about it, the more I
> feel like the current approach with a new overloaded `SinkTask::put` method
> might not be optimal. We're trying to fix a pretty corner case bug here
> (usage of topic mutating SMTs with sink connectors that do their own offset
> tracking) and I'm not sure that warrants a change to such a central
> interface method. The new `SinkTask::put` method just seems somewhat odd
> and it may not be very understandable for a new reader - I don't think this
> should be the case for a public interface method. Furthermore, even with
> elaborate documentation in place, I'm not sure if it'll be very obvious to
> most people what the purpose of having these two `put` methods is and how
> they should be used by sink task implementations. What do you think?
>
> Thanks,
> Yash
>
> On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi Randall,
> >
> > Thanks a lot for your valuable feedback so far! I've updated the KIP
> based
> > on our discussion above. Could you please take another look?
> >
> > Thanks,
> > Yash
> >
> > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> >>
> >> > Hi Randall,
> >> >
> >> > Thanks for elaborating. I think these are all very good points and I
> see
> >> > why the overloaded `SinkTask::put` method is a cleaner solution
> overall.
> >> >
> >> > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
> >> > TopicPartition> updatedTopicPartitions)
> >> >
> >> > I think this should be
> >> >
> >> > `public void put(Collection<SinkRecord> records, Map<SinkRecord,
> >> > TopicPartition> originalTopicPartitions)`
> >> >
> >> > instead because the sink records themselves have the updated topic
> >> > partitions (i.e. after all transformations have been applied) and the
> >> KIP
> >> > is proposing a way for the tasks to be able to access the original
> topic
> >> > partition (i.e. before transformations have been applied).
> >> >
> >>
> >> Sounds good.
> >>
> >>
> >> >
> >> > > Of course, if the developer does not need separate methods, they can
> >> > easily have the older `put` method simply delegate to the newer
> method.
> >> >
> >> > If the developer does not need separate methods (i.e. they don't need
> to
> >> > use this new addition), they can simply continue implementing just the
> >> > older `put` method right?
> >> >
> >>
> >> Correct. We should update the JavaDoc of both methods to make this
> clear,
> >> and in general how the two methods should are used and should be
> >> implemented. That can be part of the PR, and the KIP doesn't need this
> >> wording.
> >>
> >> >
> >> > > Finally, this gives us a roadmap for *eventually* deprecating the
> >> older
> >> > method, once the Connect runtime versions without this change are old
> >> > enough.
> >> >
> >> > I'm not sure we'd ever want to deprecate the older method. Most common
> >> sink
> >> > connector implementations do not do their own offset tracking with
> >> > asynchronous processing and will probably never have a need for the
> >> > additional parameter `Map<SinkRecord, TopicPartition>
> >> > originalTopicPartitions` in the proposed new `put` method. These
> >> connectors
> >> > can continue implementing only the existing `SinkTask::put` method
> which
> >> > will be called by the default implementation of the newer overloaded
> >> `put`
> >> > method.
> >> >
> >>
> >> +1
> >>
> >>
> >> >
> >> > > the pre-commit methods use the same `Map<TopicPartition,
> >> > OffsetAndMetadata> currentOffsets` data structure I'm suggesting be
> >> used.
> >> >
> >> > The data structure you're suggesting be used is a `Map<SinkRecord,
> >> > TopicPartition>` which will map `SinkRecord` objects to the original
> >> topic
> >> > partition of the corresponding `ConsumerRecord` right? To clarify,
> this
> >> is
> >> > a new data structure that will need to be managed in the
> >> `WorkerSinkTask`.
> >> >
> >>
> >> Ah, you're right. Thanks for the correction.
> >>
> >> Best regards,
> >> Randall
> >>
> >>
> >> > Thanks,
> >> > Yash
> >>
> >>
> >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <rha...@gmail.com>
> wrote:
> >> >
> >> > > Hi, Yash.
> >> > >
> >> > > I'm not sure I quite understand why it would be "easier" for
> connector
> >> > > > developers to account for implementing two different overloaded
> >> `put`
> >> > > > methods (assuming that they want to use this new feature) versus
> >> using
> >> > a
> >> > > > try-catch block around `SinkRecord` access methods?
> >> > >
> >> > >
> >> > > Using a try-catch to try around an API method that *might* be there
> >> is a
> >> > > very unusual thing for most developers. Unfortunately, we've had to
> >> > resort
> >> > > to this atypical approach with Connect in places when there was no
> >> good
> >> > > alternative. We seem to relying upon pattern because it's easier for
> >> us,
> >> > > not because it offers a better experience for Connector developers.
> >> IMO,
> >> > if
> >> > > there's a practical alternative that uses normal development
> practices
> >> > and
> >> > > techniques, then we should use that alternative. IIUC, there is at
> >> least
> >> > > one practical alternative for this KIP that would not require
> >> developers
> >> > to
> >> > > use the unusual try-catch to handle the case where methods are not
> >> found.
> >> > >
> >> > > I also think having two `put` methods is easier when the Connector
> >> has to
> >> > > do different things for different Connect runtimes, too. One of
> those
> >> > > methods is called by newer Connect runtimes with the new behavior,
> and
> >> > the
> >> > > other method is called by an older Connect runtime. Of course, if
> the
> >> > > developer does not need separate methods, they can easily have the
> >> older
> >> > > `put` method simply delegate to the newer method.
> >> > >
> >> > > Finally, this gives us a roadmap for *eventually* deprecating the
> >> older
> >> > > method, once the Connect runtime versions without this change are
> old
> >> > > enough.
> >> > >
> >> > > I think the advantage of going with the
> >> > > > proposed approach in the KIP is that it wouldn't require extra
> >> > > book-keeping
> >> > > > (the Map<SinkRecord,
> >> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach)
> >> > > >
> >> > >
> >> > > The connector does have to do some of this bookkeeping in how they
> >> track
> >> > > the topic partition offsets used in the `preCommit`, and the
> >> pre-commit
> >> > > methods use the same `Map<TopicPartition, OffsetAndMetadata>
> >> > > currentOffsets`
> >> > > data structure I'm suggesting be used.
> >> > >
> >> > > I hope that helps.
> >> > >
> >> > > Best regards,
> >> > >
> >> > > Randall
> >> > >
> >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <yash.ma...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hi Randall,
> >> > > >
> >> > > > Thanks for reviewing the KIP!
> >> > > >
> >> > > > > That latter logic can get quite ugly.
> >> > > >
> >> > > > I'm not sure I quite understand why it would be "easier" for
> >> connector
> >> > > > developers to account for implementing two different overloaded
> >> `put`
> >> > > > methods (assuming that they want to use this new feature) versus
> >> using
> >> > a
> >> > > > try-catch block around `SinkRecord` access methods? In both
> cases, a
> >> > > > connector developer would need to write additional code in order
> to
> >> > > ensure
> >> > > > that their connector continues working with older Connect
> runtimes.
> >> > > > Furthermore, we would probably need to carefully document how the
> >> > > > implementation for the older `put` method should look like for
> >> > connectors
> >> > > > that want to use this new feature. I think the advantage of going
> >> with
> >> > > the
> >> > > > proposed approach in the KIP is that it wouldn't require extra
> >> > > book-keeping
> >> > > > (the Map<SinkRecord,
> >> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach) and
> >> also
> >> > > the
> >> > > > fact that the try-catch based logic is an already established
> >> pattern
> >> > > > through
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> >> > > > and other KIPs which added methods to source/sink connector/task
> >> > > contexts.
> >> > > >
> >> > > > Let me know if you still feel that having a new overloaded put
> >> method
> >> > is
> >> > > a
> >> > > > cleaner solution and I'd be happy to reconsider!
> >> > > >
> >> > > > Thanks,
> >> > > > Yash
> >> > > >
> >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <rha...@gmail.com>
> >> > wrote:
> >> > > >
> >> > > > > Hi, Yash. Thanks for picking up this KIP and discussion.
> >> > > > >
> >> > > > > The KIP includes this rejected alternative:
> >> > > > >
> >> > > > > > 4. Update SinkTask.put in any way to pass the new information
> >> > outside
> >> > > > > > SinkRecord (e.g. a Map or a derived class)
> >> > > > > >
> >> > > > > >    -
> >> > > > > >
> >> > > > > >    Much more disruptive change without considerable pros
> >> > > > > >
> >> > > > > >
> >> > > > > One advantage about doing this is that sink connector
> >> implementations
> >> > > can
> >> > > > > more easily implement two different "put(...)" methods to handle
> >> > > running
> >> > > > in
> >> > > > > a variety of runtimes, without having to use try-catch logic
> >> around
> >> > the
> >> > > > > newer SinkRecord access methods. That latter logic can get quite
> >> > ugly.
> >> > > > >
> >> > > > > For example, the existing `put` method has this signature:
> >> > > > >
> >> > > > > public abstract void put(Collection<SinkRecord> records);
> >> > > > >
> >> > > > > If we added an overloaded method that passed in a map of the old
> >> > > > > topic+partition for each record (and defined the absence of an
> >> entry
> >> > as
> >> > > > > having an unchanged topic and partition):
> >> > > > >
> >> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
> >> > > > > TopicPartition> updatedTopicPartitions) {
> >> > > > > put(records);
> >> > > > > }
> >> > > > >
> >> > > > > then a `SinkTask` implementation that wants to use this new
> >> feature
> >> > > could
> >> > > > > simply implement both methods:
> >> > > > >
> >> > > > > public void put(Collection<SinkRecord> records) {
> >> > > > > // Running in an older runtime, so no tracking of SMT-modified
> >> topic
> >> > > > names
> >> > > > > or partitions
> >> > > > > put(records, Map.of());
> >> > > > > }
> >> > > > >
> >> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
> >> > > > > TopicPartition> updatedTopicPartitions) {
> >> > > > > // real logic here
> >> > > > > }
> >> > > > >
> >> > > > > This seems a lot easier than having to use try-catch logic, yet
> >> still
> >> > > > > allows sink connectors to utilize the new functionality and
> still
> >> > work
> >> > > > with
> >> > > > > older Connect runtimes.
> >> > > > >
> >> > > > > WDYT?
> >> > > > >
> >> > > > > Randall
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <yash.ma...@gmail.com
> >
> >> > > wrote:
> >> > > > >
> >> > > > > > Hi all,
> >> > > > > >
> >> > > > > > I would like to (re)start a new discussion thread on KIP-793
> >> (Kafka
> >> > > > > > Connect) which proposes some additions to the public
> SinkRecord
> >> > > > interface
> >> > > > > > in order to support topic mutating SMTs for sink connectors
> >> that do
> >> > > > their
> >> > > > > > own offset tracking.
> >> > > > > >
> >> > > > > > Links:
> >> > > > > >
> >> > > > > > KIP:
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> >> > > > > >
> >> > > > > > Older discussion thread:
> >> > > > > >
> >> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h,
> >> > > > > >
> >> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> >> > > > > >
> >> > > > > > Jira: https://issues.apache.org/jira/browse/KAFKA-13431
> >> > > > > >
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Yash
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to