Hi Yash,

> So it looks like with the current state of affairs, sink tasks that only
instantiate writers in the SinkTask::open method (and don't do the lazy
instantiation in SinkTask::put that you mentioned) might fail when used
with topic/partition mutating SMTs even if they don't do any asynchronous
processing?

Yep, exactly 👍

> What do you think about retaining just the existing methods
but changing when they're called in the Connect runtime? For instance,
instead of calling SinkTask::open after partition assignment post a
consumer group rebalance, we could cache the currently "seen" topic
partitions (post transformation) and before each call to SinkTask::put
check whether there's any new "unseen" topic partitions, and if so call
SinkTask::open (and also update the cache of course).

IMO the issue here is that it's a drastic change in behavior to start
invoking SinkTask::open and SinkTask::close with post-transform topic
partitions instead of pre-transform, especially if connectors are
intentionally designed around original topic partitions instead of
transformed ones. I think we have to provide connector developers with some
way to differentiate between the two, but maybe there's a way to do this
that I haven't thought of yet. Interested to hear your thoughts.

Either way, I'm glad that the general idea of a cache and eviction policy
for SinkTask::close seem reasonable; if we decide to go this route, it
might make sense for the KIP to include an outline of one or more
high-level strategies we might take, but without promising any particular
behavior beyond occasionally calling SinkTask::close for post-transform
topic partitions. I'm hoping that this logic can stay internal, and by not
painting ourselves into a corner with the KIP, we give ourselves leeway to
tweak it in the future if necessary without filing another KIP or
introducing a pluggable interface.

Cheers,

Chris

On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
>
> Thanks for the feedback.
>
> 1) That's a fair point; while I did scan everything publicly available on
> GitHub, you're right in that it won't cover all possible SMTs that are out
> there. Thanks for the example use-case as well, I've updated the KIP to add
> the two new proposed methods.
>
> 2) So it looks like with the current state of affairs, sink tasks that only
> instantiate writers in the SinkTask::open method (and don't do the lazy
> instantiation in SinkTask::put that you mentioned) might fail when used
> with topic/partition mutating SMTs even if they don't do any asynchronous
> processing? Since they could encounter records in SinkTask::put with
> topics/partitions that they might not have created writers for. Thanks for
> pointing this out, it's definitely another incompatibility that needs to be
> called out and fixed. The overloaded method approach is interesting, but
> comes with the caveat of yet more new methods that will need to be
> implemented by existing connectors if they want to make use of this new
> functionality. What do you think about retaining just the existing methods
> but changing when they're called in the Connect runtime? For instance,
> instead of calling SinkTask::open after partition assignment post a
> consumer group rebalance, we could cache the currently "seen" topic
> partitions (post transformation) and before each call to SinkTask::put
> check whether there's any new "unseen" topic partitions, and if so call
> SinkTask::open (and also update the cache of course). I don't think this
> would break the existing contract with sink tasks where SinkTask::open is
> expected to be called for a topic partition before any records from the
> topic partition are sent via SinkTask::put? The SinkTask::close case is a
> lot trickier however, and would require some sort of cache eviction policy
> that would be deemed appropriate as you pointed out too.
>
> Thanks,
> Yash
>
> On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > I've had some time to think on this KIP and I think I'm in agreement
> about
> > not blocking it on an official compatibility library or adding the "ack"
> > API for sink records.
> >
> > I only have two more thoughts:
> >
> > 1. Because it is possible to manipulate sink record partitions and
> offsets
> > with the current API we provide for transformations, I still believe
> > methods should be added to the SinkRecord class to expose the original
> > partition and offset, not just the original topic. The additional
> cognitive
> > burden from these two methods is going to be minimal anyways; once users
> > understand the difference between the transformed topic name and the
> > original one, it's going to be trivial for them to understand how that
> same
> > difference applies for partitions and offsets. It's not enough to scan
> the
> > set of SMTs provided out of the box with Connect, ones developed by
> > Confluent, or even everything available on GitHub, since there may be
> > closed-source projects out there that rely on this ability. One potential
> > use case could be re-routing partitions between Kafka and some other
> > sharded system.
> >
> > 2. We still have to address the SinkTask::open [1] and SinkTask::close
> [2]
> > methods. If a connector writes to the external system using the
> transformed
> > topic partitions it reads from Kafka, then it's possible for the
> connector
> > to lazily instantiate writers for topic partitions as it encounters them
> > from records provided in SinkTask::put. However, connectors also need a
> way
> > to de-allocate those writers (and the resources used by them) over time,
> > which they can't do as easily. One possible approach here is to overload
> > SinkTask::open and SinkTask::close with variants that distinguish between
> > transformed and original topic partitions, and default to invoking the
> > existing methods with just the original topic partitions. We would then
> > have several options for how the Connect runtime can invoke these
> methods,
> > but in general, an approach that guarantees that tasks are notified of
> > transformed topic partitions in SinkTask::open before any records for
> that
> > partition are given to it in SinkTask::put, and makes a best-effort
> attempt
> > to close transformed topic partitions that appear to no longer be in use
> > based on some eviction policy, would probably be sufficient.
> >
> > [1] -
> >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > [2] -
> >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> >
> > Cheers,
> >
> > Chris
> >
> > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <yash.ma...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > Thanks a lot for your inputs!
> > >
> > > > would provide a simple, clean interface for developers to determine
> > > > which features are supported by the version of the Connect runtime
> > > > that their plugin has been deployed onto
> > >
> > > I do like the idea of having such a public compatibility library - I
> > think
> > > it would remove a lot of restrictions from framework development if it
> > were
> > > to be widely adopted.
> > >
> > > > we might consider adding an API to "ack" sink records
> > >
> > > I agree that this does seem like a more intuitive and clean API, but
> I'm
> > > concerned about the backward compatibility headache we'd be imposing on
> > all
> > > existing sink connectors. Connector developers will have to maintain
> two
> > > separate ways of doing offset management if they want to use the new
> API
> > > but continue supporting older versions of Kafka Connect.
> > >
> > > For now, I've reverted the KIP to the previous iteration which proposed
> > the
> > > addition of a new `SinkRecord` method to obtain the original Kafka
> topic
> > > pre-transformation. One thing to note is that I've removed the method
> for
> > > obtaining the original Kafka partition after a cursory search showed
> that
> > > use cases for partition modifying SMTs are primarily on the source
> > > connector side.
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton <chr...@aiven.io.invalid>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have more comments I'd like to make on this KIP when I have time
> > (sorry
> > > > for the delay, Yash, and thanks for your patience!), but I did want
> to
> > > > chime in and say that I'm also not sure about overloading
> > SinkTask::put.
> > > I
> > > > share the concerns about creating an intuitive, simple API that Yash
> > has
> > > > raised. In addition, this approach doesn't seem very
> sustainable--what
> > do
> > > > we do if we encounter another case in the future that would warrant a
> > > > similar solution? We probably don't want to create three, four, etc.
> > > > overloaded variants of the method, each of which would have to be
> > > > implemented by connector developers who want to both leverage the
> > latest
> > > > and greatest connector APIs and maintain compatibility with connect
> > > > Clusters running older versions.
> > > >
> > > > I haven't been able to flesh this out into a design worth publishing
> in
> > > its
> > > > own KIP yet, but one alternative I've pitched to a few people with
> > > > generally positive interest has been to develop an official
> > compatibility
> > > > library for Connect developers. This library would be released as its
> > own
> > > > Maven artifact (separate from connect-api, connect-runtime, etc.) and
> > > would
> > > > provide a simple, clean interface for developers to determine which
> > > > features are supported by the version of the Connect runtime that
> their
> > > > plugin has been deployed onto. Under the hood, this library might use
> > > > reflection to determine whether classes, methods, etc. are available,
> > but
> > > > the developer wouldn't have to do anything more than check (for
> > example)
> > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know at any
> > > point
> > > > in the lifetime of their connector/task whether that feature is
> > provided
> > > by
> > > > the runtime.
> > > >
> > > > One other high-level comment: this doesn't address every case, but we
> > > might
> > > > consider adding an API to "ack" sink records. This could use the
> > > > SubmittedRecords class [1] (with some slight tweaks) under the hood
> to
> > > > track the latest-acked offset for each topic partition. This way,
> > > connector
> > > > developers won't be responsible for tracking offsets at all in their
> > sink
> > > > tasks (eliminating issues with the accuracy of post-transformation
> > T/P/O
> > > > sink record information), and they'll only have to notify the Connect
> > > > framework when a record has been successfully dispatched to the
> > external
> > > > system. This provides a cleaner, friendlier API, and also enables
> more
> > > > fine-grained metrics like the ones proposed in KIP-767 [2].
> > > >
> > > > [1] -
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > [2] -
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Randall,
> > > > >
> > > > > It's been a while for this one but the more I think about it, the
> > more
> > > I
> > > > > feel like the current approach with a new overloaded
> `SinkTask::put`
> > > > method
> > > > > might not be optimal. We're trying to fix a pretty corner case bug
> > here
> > > > > (usage of topic mutating SMTs with sink connectors that do their
> own
> > > > offset
> > > > > tracking) and I'm not sure that warrants a change to such a central
> > > > > interface method. The new `SinkTask::put` method just seems
> somewhat
> > > odd
> > > > > and it may not be very understandable for a new reader - I don't
> > think
> > > > this
> > > > > should be the case for a public interface method. Furthermore, even
> > > with
> > > > > elaborate documentation in place, I'm not sure if it'll be very
> > obvious
> > > > to
> > > > > most people what the purpose of having these two `put` methods is
> and
> > > how
> > > > > they should be used by sink task implementations. What do you
> think?
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Randall,
> > > > > >
> > > > > > Thanks a lot for your valuable feedback so far! I've updated the
> > KIP
> > > > > based
> > > > > > on our discussion above. Could you please take another look?
> > > > > >
> > > > > > Thanks,
> > > > > > Yash
> > > > > >
> > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <
> yash.ma...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi Randall,
> > > > > >> >
> > > > > >> > Thanks for elaborating. I think these are all very good points
> > > and I
> > > > > see
> > > > > >> > why the overloaded `SinkTask::put` method is a cleaner
> solution
> > > > > overall.
> > > > > >> >
> > > > > >> > > public void put(Collection<SinkRecord> records,
> > Map<SinkRecord,
> > > > > >> > TopicPartition> updatedTopicPartitions)
> > > > > >> >
> > > > > >> > I think this should be
> > > > > >> >
> > > > > >> > `public void put(Collection<SinkRecord> records,
> Map<SinkRecord,
> > > > > >> > TopicPartition> originalTopicPartitions)`
> > > > > >> >
> > > > > >> > instead because the sink records themselves have the updated
> > topic
> > > > > >> > partitions (i.e. after all transformations have been applied)
> > and
> > > > the
> > > > > >> KIP
> > > > > >> > is proposing a way for the tasks to be able to access the
> > original
> > > > > topic
> > > > > >> > partition (i.e. before transformations have been applied).
> > > > > >> >
> > > > > >>
> > > > > >> Sounds good.
> > > > > >>
> > > > > >>
> > > > > >> >
> > > > > >> > > Of course, if the developer does not need separate methods,
> > they
> > > > can
> > > > > >> > easily have the older `put` method simply delegate to the
> newer
> > > > > method.
> > > > > >> >
> > > > > >> > If the developer does not need separate methods (i.e. they
> don't
> > > > need
> > > > > to
> > > > > >> > use this new addition), they can simply continue implementing
> > just
> > > > the
> > > > > >> > older `put` method right?
> > > > > >> >
> > > > > >>
> > > > > >> Correct. We should update the JavaDoc of both methods to make
> this
> > > > > clear,
> > > > > >> and in general how the two methods should are used and should be
> > > > > >> implemented. That can be part of the PR, and the KIP doesn't
> need
> > > this
> > > > > >> wording.
> > > > > >>
> > > > > >> >
> > > > > >> > > Finally, this gives us a roadmap for *eventually*
> deprecating
> > > the
> > > > > >> older
> > > > > >> > method, once the Connect runtime versions without this change
> > are
> > > > old
> > > > > >> > enough.
> > > > > >> >
> > > > > >> > I'm not sure we'd ever want to deprecate the older method.
> Most
> > > > common
> > > > > >> sink
> > > > > >> > connector implementations do not do their own offset tracking
> > with
> > > > > >> > asynchronous processing and will probably never have a need
> for
> > > the
> > > > > >> > additional parameter `Map<SinkRecord, TopicPartition>
> > > > > >> > originalTopicPartitions` in the proposed new `put` method.
> These
> > > > > >> connectors
> > > > > >> > can continue implementing only the existing `SinkTask::put`
> > method
> > > > > which
> > > > > >> > will be called by the default implementation of the newer
> > > overloaded
> > > > > >> `put`
> > > > > >> > method.
> > > > > >> >
> > > > > >>
> > > > > >> +1
> > > > > >>
> > > > > >>
> > > > > >> >
> > > > > >> > > the pre-commit methods use the same `Map<TopicPartition,
> > > > > >> > OffsetAndMetadata> currentOffsets` data structure I'm
> suggesting
> > > be
> > > > > >> used.
> > > > > >> >
> > > > > >> > The data structure you're suggesting be used is a
> > `Map<SinkRecord,
> > > > > >> > TopicPartition>` which will map `SinkRecord` objects to the
> > > original
> > > > > >> topic
> > > > > >> > partition of the corresponding `ConsumerRecord` right? To
> > clarify,
> > > > > this
> > > > > >> is
> > > > > >> > a new data structure that will need to be managed in the
> > > > > >> `WorkerSinkTask`.
> > > > > >> >
> > > > > >>
> > > > > >> Ah, you're right. Thanks for the correction.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Randall
> > > > > >>
> > > > > >>
> > > > > >> > Thanks,
> > > > > >> > Yash
> > > > > >>
> > > > > >>
> > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <
> rha...@gmail.com>
> > > > > wrote:
> > > > > >> >
> > > > > >> > > Hi, Yash.
> > > > > >> > >
> > > > > >> > > I'm not sure I quite understand why it would be "easier" for
> > > > > connector
> > > > > >> > > > developers to account for implementing two different
> > > overloaded
> > > > > >> `put`
> > > > > >> > > > methods (assuming that they want to use this new feature)
> > > versus
> > > > > >> using
> > > > > >> > a
> > > > > >> > > > try-catch block around `SinkRecord` access methods?
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > Using a try-catch to try around an API method that *might*
> be
> > > > there
> > > > > >> is a
> > > > > >> > > very unusual thing for most developers. Unfortunately, we've
> > had
> > > > to
> > > > > >> > resort
> > > > > >> > > to this atypical approach with Connect in places when there
> > was
> > > no
> > > > > >> good
> > > > > >> > > alternative. We seem to relying upon pattern because it's
> > easier
> > > > for
> > > > > >> us,
> > > > > >> > > not because it offers a better experience for Connector
> > > > developers.
> > > > > >> IMO,
> > > > > >> > if
> > > > > >> > > there's a practical alternative that uses normal development
> > > > > practices
> > > > > >> > and
> > > > > >> > > techniques, then we should use that alternative. IIUC, there
> > is
> > > at
> > > > > >> least
> > > > > >> > > one practical alternative for this KIP that would not
> require
> > > > > >> developers
> > > > > >> > to
> > > > > >> > > use the unusual try-catch to handle the case where methods
> are
> > > not
> > > > > >> found.
> > > > > >> > >
> > > > > >> > > I also think having two `put` methods is easier when the
> > > Connector
> > > > > >> has to
> > > > > >> > > do different things for different Connect runtimes, too. One
> > of
> > > > > those
> > > > > >> > > methods is called by newer Connect runtimes with the new
> > > behavior,
> > > > > and
> > > > > >> > the
> > > > > >> > > other method is called by an older Connect runtime. Of
> course,
> > > if
> > > > > the
> > > > > >> > > developer does not need separate methods, they can easily
> have
> > > the
> > > > > >> older
> > > > > >> > > `put` method simply delegate to the newer method.
> > > > > >> > >
> > > > > >> > > Finally, this gives us a roadmap for *eventually*
> deprecating
> > > the
> > > > > >> older
> > > > > >> > > method, once the Connect runtime versions without this
> change
> > > are
> > > > > old
> > > > > >> > > enough.
> > > > > >> > >
> > > > > >> > > I think the advantage of going with the
> > > > > >> > > > proposed approach in the KIP is that it wouldn't require
> > extra
> > > > > >> > > book-keeping
> > > > > >> > > > (the Map<SinkRecord,
> > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed
> > approach)
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > > The connector does have to do some of this bookkeeping in
> how
> > > they
> > > > > >> track
> > > > > >> > > the topic partition offsets used in the `preCommit`, and the
> > > > > >> pre-commit
> > > > > >> > > methods use the same `Map<TopicPartition, OffsetAndMetadata>
> > > > > >> > > currentOffsets`
> > > > > >> > > data structure I'm suggesting be used.
> > > > > >> > >
> > > > > >> > > I hope that helps.
> > > > > >> > >
> > > > > >> > > Best regards,
> > > > > >> > >
> > > > > >> > > Randall
> > > > > >> > >
> > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <
> > > yash.ma...@gmail.com>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > > > Hi Randall,
> > > > > >> > > >
> > > > > >> > > > Thanks for reviewing the KIP!
> > > > > >> > > >
> > > > > >> > > > > That latter logic can get quite ugly.
> > > > > >> > > >
> > > > > >> > > > I'm not sure I quite understand why it would be "easier"
> for
> > > > > >> connector
> > > > > >> > > > developers to account for implementing two different
> > > overloaded
> > > > > >> `put`
> > > > > >> > > > methods (assuming that they want to use this new feature)
> > > versus
> > > > > >> using
> > > > > >> > a
> > > > > >> > > > try-catch block around `SinkRecord` access methods? In
> both
> > > > > cases, a
> > > > > >> > > > connector developer would need to write additional code in
> > > order
> > > > > to
> > > > > >> > > ensure
> > > > > >> > > > that their connector continues working with older Connect
> > > > > runtimes.
> > > > > >> > > > Furthermore, we would probably need to carefully document
> > how
> > > > the
> > > > > >> > > > implementation for the older `put` method should look like
> > for
> > > > > >> > connectors
> > > > > >> > > > that want to use this new feature. I think the advantage
> of
> > > > going
> > > > > >> with
> > > > > >> > > the
> > > > > >> > > > proposed approach in the KIP is that it wouldn't require
> > extra
> > > > > >> > > book-keeping
> > > > > >> > > > (the Map<SinkRecord,
> > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed
> > approach)
> > > > and
> > > > > >> also
> > > > > >> > > the
> > > > > >> > > > fact that the try-catch based logic is an already
> > established
> > > > > >> pattern
> > > > > >> > > > through
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > >> > > > and other KIPs which added methods to source/sink
> > > connector/task
> > > > > >> > > contexts.
> > > > > >> > > >
> > > > > >> > > > Let me know if you still feel that having a new overloaded
> > put
> > > > > >> method
> > > > > >> > is
> > > > > >> > > a
> > > > > >> > > > cleaner solution and I'd be happy to reconsider!
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Yash
> > > > > >> > > >
> > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <
> > > > rha...@gmail.com>
> > > > > >> > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and discussion.
> > > > > >> > > > >
> > > > > >> > > > > The KIP includes this rejected alternative:
> > > > > >> > > > >
> > > > > >> > > > > > 4. Update SinkTask.put in any way to pass the new
> > > > information
> > > > > >> > outside
> > > > > >> > > > > > SinkRecord (e.g. a Map or a derived class)
> > > > > >> > > > > >
> > > > > >> > > > > >    -
> > > > > >> > > > > >
> > > > > >> > > > > >    Much more disruptive change without considerable
> pros
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > One advantage about doing this is that sink connector
> > > > > >> implementations
> > > > > >> > > can
> > > > > >> > > > > more easily implement two different "put(...)" methods
> to
> > > > handle
> > > > > >> > > running
> > > > > >> > > > in
> > > > > >> > > > > a variety of runtimes, without having to use try-catch
> > logic
> > > > > >> around
> > > > > >> > the
> > > > > >> > > > > newer SinkRecord access methods. That latter logic can
> get
> > > > quite
> > > > > >> > ugly.
> > > > > >> > > > >
> > > > > >> > > > > For example, the existing `put` method has this
> signature:
> > > > > >> > > > >
> > > > > >> > > > > public abstract void put(Collection<SinkRecord>
> records);
> > > > > >> > > > >
> > > > > >> > > > > If we added an overloaded method that passed in a map of
> > the
> > > > old
> > > > > >> > > > > topic+partition for each record (and defined the absence
> > of
> > > an
> > > > > >> entry
> > > > > >> > as
> > > > > >> > > > > having an unchanged topic and partition):
> > > > > >> > > > >
> > > > > >> > > > > public void put(Collection<SinkRecord> records,
> > > > Map<SinkRecord,
> > > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > > >> > > > > put(records);
> > > > > >> > > > > }
> > > > > >> > > > >
> > > > > >> > > > > then a `SinkTask` implementation that wants to use this
> > new
> > > > > >> feature
> > > > > >> > > could
> > > > > >> > > > > simply implement both methods:
> > > > > >> > > > >
> > > > > >> > > > > public void put(Collection<SinkRecord> records) {
> > > > > >> > > > > // Running in an older runtime, so no tracking of
> > > SMT-modified
> > > > > >> topic
> > > > > >> > > > names
> > > > > >> > > > > or partitions
> > > > > >> > > > > put(records, Map.of());
> > > > > >> > > > > }
> > > > > >> > > > >
> > > > > >> > > > > public void put(Collection<SinkRecord> records,
> > > > Map<SinkRecord,
> > > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > > >> > > > > // real logic here
> > > > > >> > > > > }
> > > > > >> > > > >
> > > > > >> > > > > This seems a lot easier than having to use try-catch
> > logic,
> > > > yet
> > > > > >> still
> > > > > >> > > > > allows sink connectors to utilize the new functionality
> > and
> > > > > still
> > > > > >> > work
> > > > > >> > > > with
> > > > > >> > > > > older Connect runtimes.
> > > > > >> > > > >
> > > > > >> > > > > WDYT?
> > > > > >> > > > >
> > > > > >> > > > > Randall
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <
> > > > yash.ma...@gmail.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi all,
> > > > > >> > > > > >
> > > > > >> > > > > > I would like to (re)start a new discussion thread on
> > > KIP-793
> > > > > >> (Kafka
> > > > > >> > > > > > Connect) which proposes some additions to the public
> > > > > SinkRecord
> > > > > >> > > > interface
> > > > > >> > > > > > in order to support topic mutating SMTs for sink
> > > connectors
> > > > > >> that do
> > > > > >> > > > their
> > > > > >> > > > > > own offset tracking.
> > > > > >> > > > > >
> > > > > >> > > > > > Links:
> > > > > >> > > > > >
> > > > > >> > > > > > KIP:
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > >> > > > > >
> > > > > >> > > > > > Older discussion thread:
> > > > > >> > > > > >
> > > > > >>
> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h,
> > > > > >> > > > > >
> > > > > >>
> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > >> > > > > >
> > > > > >> > > > > > Jira:
> https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Yash
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to