Hi Yash,

I think the use case for pre-transform TPO coordinates (and topic partition
writers created/destroyed in close/open) tends to boil down to exactly-once
semantics, where it's desirable to preserve the guarantees that Kafka
provides (every record has a unique TPO trio, and records are ordered by
offset within a topic partition).

It's my understanding that this approach is utilized in several connectors
out there today, and it might break these connectors to start using the
post-transform topic partitions automatically in their open/close methods.

If we want to get really fancy with this and try to obviate or at least
reduce the need for per-connector code changes, we might try to introduce a
framework-level configuration property to dictate which of the
pre-transform and post-transform topic partitions are used for the fallback
call to the single-arg variant if a task class has not overridden the
multi-arg variant. But I think this is going a bit too far and would prefer
to keep things simple(r) for now.

Cheers,

Chris


On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
>
> > I was actually envisioning something like `void
> > open(Collection<TopicPartition> originalPartitions,
> > Collection<TopicPartition> transformedPartitions)`
>
> Ah okay, this does make a lot more sense. Sorry, I think I misunderstood
> you earlier. I do agree with you that this seems better than splitting it
> off into two new sets of open / close methods from a complexity standpoint.
>
> > Plus, if a connector is intentionally designed to use
> > pre-transformation topic partitions in its open/close
> > methods, wouldn't we just be trading one form of the
> >  problem for another by making this switch?
>
> On thinking about this a bit more, I'm not so convinced that we need to
> expose the pre-transform / original topic partitions in the new open /
> close methods. The purpose of the open / close methods is to allow sink
> tasks to allocate and deallocate resources for each topic partition
> assigned to the task and the purpose of topic-mutating SMTs is to
> essentially modify the source topic name from the point of view of the sink
> connector. Why would a sink connector ever need to or want to allocate
> resources for pre-transform topic partitions? Is the argument here that
> since we'll be exposing both the pre-transform and post-transform topic
> partitions per record, we should also expose the same info via open / close
> and allow sink connector implementations to disregard topic-mutating SMTs
> completely if they wanted to?
>
> Either way, I've gone ahead and updated the KIP to reflect all of
> our previous discussion here since it had become quite outdated. I've also
> updated the KIP title from "Sink Connectors: Support topic-mutating SMTs
> for async connectors (preCommit users)" to "Allow sink connectors to be
> used with topic-mutating SMTs" since the improvements to the open / close
> mechanism doesn't pertain only to asynchronous sink connectors. The new KIP
> URL is:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
>
>
> Thanks,
> Yash
>
> On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > I was actually envisioning something like `void
> > open(Collection<TopicPartition>
> > originalPartitions, Collection<TopicPartition> transformedPartitions)`,
> > since we already convert and transform each batch of records that we poll
> > from the sink task's consumer en masse, meaning we could discover several
> > new transformed partitions in between consecutive calls to SinkTask::put.
> >
> > It's also worth noting that we'll probably want to deprecate the existing
> > open/close methods, at which point keeping one non-deprecated variant of
> > each seems more appealing and less complex than keeping two.
> >
> > Honestly though, I think we're both on the same page enough that I
> wouldn't
> > object to either approach. We've probably reached the saturation point
> for
> > ROI here and as long as we provide developers a way to get the
> information
> > they need from the runtime and take care to add Javadocs and update our
> > docs page (possibly including the connector development quickstart), it
> > should be fine.
> >
> > At this point, it might be worth updating the KIP based on recent
> > discussion so that others can see the latest proposal, and we can both
> take
> > a look and make sure everything looks good enough before opening a vote
> > thread.
> >
> > Finally, I think you make a convincing case for a time-based eviction
> > policy. I wasn't thinking about the fairly common SMT pattern of
> deriving a
> > topic name from, e.g., a record field or header.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> >
> > > Hi Chris,
> > >
> > > > Plus, if a connector is intentionally designed to
> > > > use pre-transformation topic partitions in its
> > > > open/close methods, wouldn't we just be trading
> > > > one form of the problem for another by making this
> > > > switch?
> > >
> > > Thanks, this makes sense, and given that the KIP already proposes a way
> > for
> > > sink connector implementations to distinguish between pre-transform and
> > > post-transform topics per record, I think I'm convinced that going with
> > new
> > > `open()` / `close()` methods is the right approach. However, I still
> feel
> > > like having overloaded methods will make it a lot less unintuitive
> given
> > > that the two sets of methods would be different in terms of when
> they're
> > > called and what arguments they are passed (also I'm presuming that the
> > > overloaded methods you're prescribing will only have a single
> > > `TopicPartition` rather than a `Collection<TopicPartition>` as their
> > > parameters). I guess my concern is largely around the fact that it
> won't
> > be
> > > possible to distinguish between the overloaded methods' use cases just
> > from
> > > the method signatures. I agree that naming is going to be difficult
> here,
> > > but I think that having two sets of `SinkTask::openXyz` /
> > > `SinkTask::closeXyz` methods will be less complicated to understand
> from
> > a
> > > connector developer perspective (as compared to overloaded methods with
> > > only differing documentation). Of your suggested options, I think
> > > `openPreTransform` / `openPostTransform` are the most comprehensible
> > ones.
> > >
> > > > BTW, I wouldn't say that we can't make assumptions
> > > > about the relationships between pre- and post-transformation
> > > >  topic partitions.
> > >
> > > I meant that the framework wouldn't be able to deterministically know
> > when
> > > to close a post-transform topic partition given that SMTs could use
> > > per-record data / metadata to manipulate the topic names as and how
> > > required (which supports the suggestion to use an eviction policy based
> > > mechanism to call SinkTask::close for post-transform topic partitions).
> > >
> > > > We might utilize a policy that assumes a deterministic
> > > > mapping from the former to the latter, for example.
> > >
> > > Wouldn't this be making the assumption that SMTs only use the topic
> name
> > > itself and no other data / metadata while computing the new topic name?
> > Are
> > > you suggesting that since this assumption could work for a majority of
> > > SMTs, it might be more efficient overall in terms of reducing the
> number
> > of
> > > "false-positive" calls to `SinkTask::closePostTransform` (and we'll
> also
> > be
> > > able to call `SinkTask::closePostTransform` immediately after topic
> > > partitions are revoked from the consumer)? I was thinking something
> more
> > > generic along the lines of a simple time based eviction policy that
> > > wouldn't be making any assumptions regarding the SMT implementations.
> > > Either way, I do like your earlier suggestion of keeping this logic
> > > internal and not painting ourselves into a corner by promising any
> > > particular behavior in the KIP.
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi Yash,
> > > >
> > > > I think the key difference between adding methods/overloads related
> to
> > > > SinkTask::open/SinkTask::close and SinkTask::put is that this isn't
> > > > auxiliary information that may or may not be useful to connector
> > > > developers. It's actually critical for them to understand the
> > difference
> > > > between the two concepts here, even if they look very similar. And
> > yes, I
> > > > do believe that switching from pre-transform to post-transform topic
> > > > partitions is too big a change in behavior here. Plus, if a connector
> > is
> > > > intentionally designed to use pre-transformation topic partitions in
> > its
> > > > open/close methods, wouldn't we just be trading one form of the
> problem
> > > for
> > > > another by making this switch?
> > > >
> > > > One possible alternative to overloading the existing methods is to
> > split
> > > > SinkTask::open into openOriginal (or possibly openPhysical or
> > > > openPreTransform) and openTransformed (or openLogical or
> > > > openPostTransform), with a similar change for SinkTask::close. The
> > > default
> > > > implementation for SinkTask::openOriginal can be to call
> > SinkTask::open,
> > > > and the same can go for SinkTask::close. However, I prefer
> overloading
> > > the
> > > > existing methods since this alternative increases complexity and none
> > of
> > > > the names are very informative.
> > > >
> > > > BTW, I wouldn't say that we can't make assumptions about the
> > > relationships
> > > > between pre- and post-transformation topic partitions. We might
> > utilize a
> > > > policy that assumes a deterministic mapping from the former to the
> > > latter,
> > > > for example. The distinction I'd draw is that the assumptions we make
> > can
> > > > and probably should favor some cases in terms of performance (i.e.,
> > > > reducing the number of unnecessary calls to close/open over a given
> > sink
> > > > task's lifetime), but should not lead to guaranteed resource leaks or
> > > > failure to obey API contract in any cases.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > > especially if connectors are intentionally designed around
> > > > > > original topic partitions instead of transformed ones.
> > > > >
> > > > > Ha, that's a good point and reminds me of Hyrum's Law [1] :)
> > > > >
> > > > > > I think we have to provide connector developers with some
> > > > > > way to differentiate between the two, but maybe there's a way
> > > > > >  to do this that I haven't thought of yet
> > > > >
> > > > > I can't think of a better way to do this either; would invoking the
> > > > > existing `SinkTask::open` and `SinkTask::close` methods with
> > > > post-transform
> > > > > topic partitions instead of pre-transform topic partitions not be
> > > > > acceptable even in a minor / major AK release? I feel like the
> > proposed
> > > > > approach of adding overloaded `SinkTask::open` / `SinkTask::close`
> > > > methods
> > > > > to differentiate between pre-transform and post-transform topic
> > > > partitions
> > > > > has similar pitfalls to the idea of the overloaded `SinkTask::put`
> > > method
> > > > > we discarded earlier.
> > > > >
> > > > > > Either way, I'm glad that the general idea of a cache and
> > > > > > eviction policy for SinkTask::close seem reasonable; if
> > > > > > we decide to go this route, it might make sense for the KIP
> > > > > > to include an outline of one or more high-level strategies
> > > > > > we might take, but without promising any particular behavior
> > > > > > beyond occasionally calling SinkTask::close for post-transform
> > > > > > topic partitions. I'm hoping that this logic can stay internal,
> > > > > > and by notpainting ourselves into a corner with the KIP, we
> > > > > > give ourselves leeway to tweak it in the future if necessary
> > > > > > without filing another KIP or introducing a pluggable interface.
> > > > >
> > > > > Thanks, that's a good idea. Given the flexibility of SMTs, the
> > > framework
> > > > > can't really make any assumptions around topic partitions post
> > > > > transformation nor does it have any way to definitively get any
> such
> > > > > information from transformations which is why the idea of a cache
> > with
> > > an
> > > > > eviction policy makes perfect sense!
> > > > >
> > > > > [1] - https://www.hyrumslaw.com/
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton
> <chr...@aiven.io.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Yash,
> > > > > >
> > > > > > > So it looks like with the current state of affairs, sink tasks
> > that
> > > > > only
> > > > > > instantiate writers in the SinkTask::open method (and don't do
> the
> > > lazy
> > > > > > instantiation in SinkTask::put that you mentioned) might fail
> when
> > > used
> > > > > > with topic/partition mutating SMTs even if they don't do any
> > > > asynchronous
> > > > > > processing?
> > > > > >
> > > > > > Yep, exactly 👍
> > > > > >
> > > > > > > What do you think about retaining just the existing methods
> > > > > > but changing when they're called in the Connect runtime? For
> > > instance,
> > > > > > instead of calling SinkTask::open after partition assignment
> post a
> > > > > > consumer group rebalance, we could cache the currently "seen"
> topic
> > > > > > partitions (post transformation) and before each call to
> > > SinkTask::put
> > > > > > check whether there's any new "unseen" topic partitions, and if
> so
> > > call
> > > > > > SinkTask::open (and also update the cache of course).
> > > > > >
> > > > > > IMO the issue here is that it's a drastic change in behavior to
> > start
> > > > > > invoking SinkTask::open and SinkTask::close with post-transform
> > topic
> > > > > > partitions instead of pre-transform, especially if connectors are
> > > > > > intentionally designed around original topic partitions instead
> of
> > > > > > transformed ones. I think we have to provide connector developers
> > > with
> > > > > some
> > > > > > way to differentiate between the two, but maybe there's a way to
> do
> > > > this
> > > > > > that I haven't thought of yet. Interested to hear your thoughts.
> > > > > >
> > > > > > Either way, I'm glad that the general idea of a cache and
> eviction
> > > > policy
> > > > > > for SinkTask::close seem reasonable; if we decide to go this
> route,
> > > it
> > > > > > might make sense for the KIP to include an outline of one or more
> > > > > > high-level strategies we might take, but without promising any
> > > > particular
> > > > > > behavior beyond occasionally calling SinkTask::close for
> > > post-transform
> > > > > > topic partitions. I'm hoping that this logic can stay internal,
> and
> > > by
> > > > > not
> > > > > > painting ourselves into a corner with the KIP, we give ourselves
> > > leeway
> > > > > to
> > > > > > tweak it in the future if necessary without filing another KIP or
> > > > > > introducing a pluggable interface.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <yash.ma...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > 1) That's a fair point; while I did scan everything publicly
> > > > available
> > > > > on
> > > > > > > GitHub, you're right in that it won't cover all possible SMTs
> > that
> > > > are
> > > > > > out
> > > > > > > there. Thanks for the example use-case as well, I've updated
> the
> > > KIP
> > > > to
> > > > > > add
> > > > > > > the two new proposed methods.
> > > > > > >
> > > > > > > 2) So it looks like with the current state of affairs, sink
> tasks
> > > > that
> > > > > > only
> > > > > > > instantiate writers in the SinkTask::open method (and don't do
> > the
> > > > lazy
> > > > > > > instantiation in SinkTask::put that you mentioned) might fail
> > when
> > > > used
> > > > > > > with topic/partition mutating SMTs even if they don't do any
> > > > > asynchronous
> > > > > > > processing? Since they could encounter records in SinkTask::put
> > > with
> > > > > > > topics/partitions that they might not have created writers for.
> > > > Thanks
> > > > > > for
> > > > > > > pointing this out, it's definitely another incompatibility that
> > > needs
> > > > > to
> > > > > > be
> > > > > > > called out and fixed. The overloaded method approach is
> > > interesting,
> > > > > but
> > > > > > > comes with the caveat of yet more new methods that will need to
> > be
> > > > > > > implemented by existing connectors if they want to make use of
> > this
> > > > new
> > > > > > > functionality. What do you think about retaining just the
> > existing
> > > > > > methods
> > > > > > > but changing when they're called in the Connect runtime? For
> > > > instance,
> > > > > > > instead of calling SinkTask::open after partition assignment
> > post a
> > > > > > > consumer group rebalance, we could cache the currently "seen"
> > topic
> > > > > > > partitions (post transformation) and before each call to
> > > > SinkTask::put
> > > > > > > check whether there's any new "unseen" topic partitions, and if
> > so
> > > > call
> > > > > > > SinkTask::open (and also update the cache of course). I don't
> > think
> > > > > this
> > > > > > > would break the existing contract with sink tasks where
> > > > SinkTask::open
> > > > > is
> > > > > > > expected to be called for a topic partition before any records
> > from
> > > > the
> > > > > > > topic partition are sent via SinkTask::put? The SinkTask::close
> > > case
> > > > > is a
> > > > > > > lot trickier however, and would require some sort of cache
> > eviction
> > > > > > policy
> > > > > > > that would be deemed appropriate as you pointed out too.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yash
> > > > > > >
> > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Yash,
> > > > > > > >
> > > > > > > > I've had some time to think on this KIP and I think I'm in
> > > > agreement
> > > > > > > about
> > > > > > > > not blocking it on an official compatibility library or
> adding
> > > the
> > > > > > "ack"
> > > > > > > > API for sink records.
> > > > > > > >
> > > > > > > > I only have two more thoughts:
> > > > > > > >
> > > > > > > > 1. Because it is possible to manipulate sink record
> partitions
> > > and
> > > > > > > offsets
> > > > > > > > with the current API we provide for transformations, I still
> > > > believe
> > > > > > > > methods should be added to the SinkRecord class to expose the
> > > > > original
> > > > > > > > partition and offset, not just the original topic. The
> > additional
> > > > > > > cognitive
> > > > > > > > burden from these two methods is going to be minimal anyways;
> > > once
> > > > > > users
> > > > > > > > understand the difference between the transformed topic name
> > and
> > > > the
> > > > > > > > original one, it's going to be trivial for them to understand
> > how
> > > > > that
> > > > > > > same
> > > > > > > > difference applies for partitions and offsets. It's not
> enough
> > to
> > > > > scan
> > > > > > > the
> > > > > > > > set of SMTs provided out of the box with Connect, ones
> > developed
> > > by
> > > > > > > > Confluent, or even everything available on GitHub, since
> there
> > > may
> > > > be
> > > > > > > > closed-source projects out there that rely on this ability.
> One
> > > > > > potential
> > > > > > > > use case could be re-routing partitions between Kafka and
> some
> > > > other
> > > > > > > > sharded system.
> > > > > > > >
> > > > > > > > 2. We still have to address the SinkTask::open [1] and
> > > > > SinkTask::close
> > > > > > > [2]
> > > > > > > > methods. If a connector writes to the external system using
> the
> > > > > > > transformed
> > > > > > > > topic partitions it reads from Kafka, then it's possible for
> > the
> > > > > > > connector
> > > > > > > > to lazily instantiate writers for topic partitions as it
> > > encounters
> > > > > > them
> > > > > > > > from records provided in SinkTask::put. However, connectors
> > also
> > > > > need a
> > > > > > > way
> > > > > > > > to de-allocate those writers (and the resources used by them)
> > > over
> > > > > > time,
> > > > > > > > which they can't do as easily. One possible approach here is
> to
> > > > > > overload
> > > > > > > > SinkTask::open and SinkTask::close with variants that
> > distinguish
> > > > > > between
> > > > > > > > transformed and original topic partitions, and default to
> > > invoking
> > > > > the
> > > > > > > > existing methods with just the original topic partitions. We
> > > would
> > > > > then
> > > > > > > > have several options for how the Connect runtime can invoke
> > these
> > > > > > > methods,
> > > > > > > > but in general, an approach that guarantees that tasks are
> > > notified
> > > > > of
> > > > > > > > transformed topic partitions in SinkTask::open before any
> > records
> > > > for
> > > > > > > that
> > > > > > > > partition are given to it in SinkTask::put, and makes a
> > > best-effort
> > > > > > > attempt
> > > > > > > > to close transformed topic partitions that appear to no
> longer
> > be
> > > > in
> > > > > > use
> > > > > > > > based on some eviction policy, would probably be sufficient.
> > > > > > > >
> > > > > > > > [1] -
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > > > > > > > [2] -
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <
> > yash.ma...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > Thanks a lot for your inputs!
> > > > > > > > >
> > > > > > > > > > would provide a simple, clean interface for developers to
> > > > > determine
> > > > > > > > > > which features are supported by the version of the
> Connect
> > > > > runtime
> > > > > > > > > > that their plugin has been deployed onto
> > > > > > > > >
> > > > > > > > > I do like the idea of having such a public compatibility
> > > library
> > > > -
> > > > > I
> > > > > > > > think
> > > > > > > > > it would remove a lot of restrictions from framework
> > > development
> > > > if
> > > > > > it
> > > > > > > > were
> > > > > > > > > to be widely adopted.
> > > > > > > > >
> > > > > > > > > > we might consider adding an API to "ack" sink records
> > > > > > > > >
> > > > > > > > > I agree that this does seem like a more intuitive and clean
> > > API,
> > > > > but
> > > > > > > I'm
> > > > > > > > > concerned about the backward compatibility headache we'd be
> > > > > imposing
> > > > > > on
> > > > > > > > all
> > > > > > > > > existing sink connectors. Connector developers will have to
> > > > > maintain
> > > > > > > two
> > > > > > > > > separate ways of doing offset management if they want to
> use
> > > the
> > > > > new
> > > > > > > API
> > > > > > > > > but continue supporting older versions of Kafka Connect.
> > > > > > > > >
> > > > > > > > > For now, I've reverted the KIP to the previous iteration
> > which
> > > > > > proposed
> > > > > > > > the
> > > > > > > > > addition of a new `SinkRecord` method to obtain the
> original
> > > > Kafka
> > > > > > > topic
> > > > > > > > > pre-transformation. One thing to note is that I've removed
> > the
> > > > > method
> > > > > > > for
> > > > > > > > > obtaining the original Kafka partition after a cursory
> search
> > > > > showed
> > > > > > > that
> > > > > > > > > use cases for partition modifying SMTs are primarily on the
> > > > source
> > > > > > > > > connector side.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yash
> > > > > > > > >
> > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton
> > > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I have more comments I'd like to make on this KIP when I
> > have
> > > > > time
> > > > > > > > (sorry
> > > > > > > > > > for the delay, Yash, and thanks for your patience!), but
> I
> > > did
> > > > > want
> > > > > > > to
> > > > > > > > > > chime in and say that I'm also not sure about overloading
> > > > > > > > SinkTask::put.
> > > > > > > > > I
> > > > > > > > > > share the concerns about creating an intuitive, simple
> API
> > > that
> > > > > > Yash
> > > > > > > > has
> > > > > > > > > > raised. In addition, this approach doesn't seem very
> > > > > > > sustainable--what
> > > > > > > > do
> > > > > > > > > > we do if we encounter another case in the future that
> would
> > > > > > warrant a
> > > > > > > > > > similar solution? We probably don't want to create three,
> > > four,
> > > > > > etc.
> > > > > > > > > > overloaded variants of the method, each of which would
> have
> > > to
> > > > be
> > > > > > > > > > implemented by connector developers who want to both
> > leverage
> > > > the
> > > > > > > > latest
> > > > > > > > > > and greatest connector APIs and maintain compatibility
> with
> > > > > connect
> > > > > > > > > > Clusters running older versions.
> > > > > > > > > >
> > > > > > > > > > I haven't been able to flesh this out into a design worth
> > > > > > publishing
> > > > > > > in
> > > > > > > > > its
> > > > > > > > > > own KIP yet, but one alternative I've pitched to a few
> > people
> > > > > with
> > > > > > > > > > generally positive interest has been to develop an
> official
> > > > > > > > compatibility
> > > > > > > > > > library for Connect developers. This library would be
> > > released
> > > > as
> > > > > > its
> > > > > > > > own
> > > > > > > > > > Maven artifact (separate from connect-api,
> connect-runtime,
> > > > etc.)
> > > > > > and
> > > > > > > > > would
> > > > > > > > > > provide a simple, clean interface for developers to
> > determine
> > > > > which
> > > > > > > > > > features are supported by the version of the Connect
> > runtime
> > > > that
> > > > > > > their
> > > > > > > > > > plugin has been deployed onto. Under the hood, this
> library
> > > > might
> > > > > > use
> > > > > > > > > > reflection to determine whether classes, methods, etc.
> are
> > > > > > available,
> > > > > > > > but
> > > > > > > > > > the developer wouldn't have to do anything more than
> check
> > > (for
> > > > > > > > example)
> > > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to
> > know
> > > > at
> > > > > > any
> > > > > > > > > point
> > > > > > > > > > in the lifetime of their connector/task whether that
> > feature
> > > is
> > > > > > > > provided
> > > > > > > > > by
> > > > > > > > > > the runtime.
> > > > > > > > > >
> > > > > > > > > > One other high-level comment: this doesn't address every
> > > case,
> > > > > but
> > > > > > we
> > > > > > > > > might
> > > > > > > > > > consider adding an API to "ack" sink records. This could
> > use
> > > > the
> > > > > > > > > > SubmittedRecords class [1] (with some slight tweaks)
> under
> > > the
> > > > > hood
> > > > > > > to
> > > > > > > > > > track the latest-acked offset for each topic partition.
> > This
> > > > way,
> > > > > > > > > connector
> > > > > > > > > > developers won't be responsible for tracking offsets at
> all
> > > in
> > > > > > their
> > > > > > > > sink
> > > > > > > > > > tasks (eliminating issues with the accuracy of
> > > > > post-transformation
> > > > > > > > T/P/O
> > > > > > > > > > sink record information), and they'll only have to notify
> > the
> > > > > > Connect
> > > > > > > > > > framework when a record has been successfully dispatched
> to
> > > the
> > > > > > > > external
> > > > > > > > > > system. This provides a cleaner, friendlier API, and also
> > > > enables
> > > > > > > more
> > > > > > > > > > fine-grained metrics like the ones proposed in KIP-767
> [2].
> > > > > > > > > >
> > > > > > > > > > [1] -
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > > > > > > > [2] -
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <
> > > > yash.ma...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Randall,
> > > > > > > > > > >
> > > > > > > > > > > It's been a while for this one but the more I think
> about
> > > it,
> > > > > the
> > > > > > > > more
> > > > > > > > > I
> > > > > > > > > > > feel like the current approach with a new overloaded
> > > > > > > `SinkTask::put`
> > > > > > > > > > method
> > > > > > > > > > > might not be optimal. We're trying to fix a pretty
> corner
> > > > case
> > > > > > bug
> > > > > > > > here
> > > > > > > > > > > (usage of topic mutating SMTs with sink connectors that
> > do
> > > > > their
> > > > > > > own
> > > > > > > > > > offset
> > > > > > > > > > > tracking) and I'm not sure that warrants a change to
> > such a
> > > > > > central
> > > > > > > > > > > interface method. The new `SinkTask::put` method just
> > seems
> > > > > > > somewhat
> > > > > > > > > odd
> > > > > > > > > > > and it may not be very understandable for a new reader
> -
> > I
> > > > > don't
> > > > > > > > think
> > > > > > > > > > this
> > > > > > > > > > > should be the case for a public interface method.
> > > > Furthermore,
> > > > > > even
> > > > > > > > > with
> > > > > > > > > > > elaborate documentation in place, I'm not sure if it'll
> > be
> > > > very
> > > > > > > > obvious
> > > > > > > > > > to
> > > > > > > > > > > most people what the purpose of having these two `put`
> > > > methods
> > > > > is
> > > > > > > and
> > > > > > > > > how
> > > > > > > > > > > they should be used by sink task implementations. What
> do
> > > you
> > > > > > > think?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <
> > > > > yash.ma...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks a lot for your valuable feedback so far! I've
> > > > updated
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > > based
> > > > > > > > > > > > on our discussion above. Could you please take
> another
> > > > look?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yash
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <
> > > > > > rha...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <
> > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hi Randall,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for elaborating. I think these are all very
> > > good
> > > > > > points
> > > > > > > > > and I
> > > > > > > > > > > see
> > > > > > > > > > > >> > why the overloaded `SinkTask::put` method is a
> > cleaner
> > > > > > > solution
> > > > > > > > > > > overall.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > public void put(Collection<SinkRecord> records,
> > > > > > > > Map<SinkRecord,
> > > > > > > > > > > >> > TopicPartition> updatedTopicPartitions)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think this should be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > `public void put(Collection<SinkRecord> records,
> > > > > > > Map<SinkRecord,
> > > > > > > > > > > >> > TopicPartition> originalTopicPartitions)`
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > instead because the sink records themselves have
> the
> > > > > updated
> > > > > > > > topic
> > > > > > > > > > > >> > partitions (i.e. after all transformations have
> been
> > > > > > applied)
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > >> KIP
> > > > > > > > > > > >> > is proposing a way for the tasks to be able to
> > access
> > > > the
> > > > > > > > original
> > > > > > > > > > > topic
> > > > > > > > > > > >> > partition (i.e. before transformations have been
> > > > applied).
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >> Sounds good.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Of course, if the developer does not need
> separate
> > > > > > methods,
> > > > > > > > they
> > > > > > > > > > can
> > > > > > > > > > > >> > easily have the older `put` method simply delegate
> > to
> > > > the
> > > > > > > newer
> > > > > > > > > > > method.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > If the developer does not need separate methods
> > (i.e.
> > > > they
> > > > > > > don't
> > > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > >> > use this new addition), they can simply continue
> > > > > > implementing
> > > > > > > > just
> > > > > > > > > > the
> > > > > > > > > > > >> > older `put` method right?
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >> Correct. We should update the JavaDoc of both
> methods
> > to
> > > > > make
> > > > > > > this
> > > > > > > > > > > clear,
> > > > > > > > > > > >> and in general how the two methods should are used
> and
> > > > > should
> > > > > > be
> > > > > > > > > > > >> implemented. That can be part of the PR, and the KIP
> > > > doesn't
> > > > > > > need
> > > > > > > > > this
> > > > > > > > > > > >> wording.
> > > > > > > > > > > >>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> *eventually*
> > > > > > > deprecating
> > > > > > > > > the
> > > > > > > > > > > >> older
> > > > > > > > > > > >> > method, once the Connect runtime versions without
> > this
> > > > > > change
> > > > > > > > are
> > > > > > > > > > old
> > > > > > > > > > > >> > enough.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate the older
> > > > method.
> > > > > > > Most
> > > > > > > > > > common
> > > > > > > > > > > >> sink
> > > > > > > > > > > >> > connector implementations do not do their own
> offset
> > > > > > tracking
> > > > > > > > with
> > > > > > > > > > > >> > asynchronous processing and will probably never
> > have a
> > > > > need
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > >> > additional parameter `Map<SinkRecord,
> > TopicPartition>
> > > > > > > > > > > >> > originalTopicPartitions` in the proposed new `put`
> > > > method.
> > > > > > > These
> > > > > > > > > > > >> connectors
> > > > > > > > > > > >> > can continue implementing only the existing
> > > > > `SinkTask::put`
> > > > > > > > method
> > > > > > > > > > > which
> > > > > > > > > > > >> > will be called by the default implementation of
> the
> > > > newer
> > > > > > > > > overloaded
> > > > > > > > > > > >> `put`
> > > > > > > > > > > >> > method.
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >> +1
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > the pre-commit methods use the same
> > > > `Map<TopicPartition,
> > > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data structure
> > I'm
> > > > > > > suggesting
> > > > > > > > > be
> > > > > > > > > > > >> used.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The data structure you're suggesting be used is a
> > > > > > > > `Map<SinkRecord,
> > > > > > > > > > > >> > TopicPartition>` which will map `SinkRecord`
> objects
> > > to
> > > > > the
> > > > > > > > > original
> > > > > > > > > > > >> topic
> > > > > > > > > > > >> > partition of the corresponding `ConsumerRecord`
> > right?
> > > > To
> > > > > > > > clarify,
> > > > > > > > > > > this
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > a new data structure that will need to be managed
> in
> > > the
> > > > > > > > > > > >> `WorkerSinkTask`.
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >> Ah, you're right. Thanks for the correction.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Best regards,
> > > > > > > > > > > >> Randall
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > >> > Yash
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <
> > > > > > > rha...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Hi, Yash.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > I'm not sure I quite understand why it would be
> > > > "easier"
> > > > > > for
> > > > > > > > > > > connector
> > > > > > > > > > > >> > > > developers to account for implementing two
> > > different
> > > > > > > > > overloaded
> > > > > > > > > > > >> `put`
> > > > > > > > > > > >> > > > methods (assuming that they want to use this
> new
> > > > > > feature)
> > > > > > > > > versus
> > > > > > > > > > > >> using
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access
> > > methods?
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Using a try-catch to try around an API method
> that
> > > > > *might*
> > > > > > > be
> > > > > > > > > > there
> > > > > > > > > > > >> is a
> > > > > > > > > > > >> > > very unusual thing for most developers.
> > > Unfortunately,
> > > > > > we've
> > > > > > > > had
> > > > > > > > > > to
> > > > > > > > > > > >> > resort
> > > > > > > > > > > >> > > to this atypical approach with Connect in places
> > > when
> > > > > > there
> > > > > > > > was
> > > > > > > > > no
> > > > > > > > > > > >> good
> > > > > > > > > > > >> > > alternative. We seem to relying upon pattern
> > because
> > > > > it's
> > > > > > > > easier
> > > > > > > > > > for
> > > > > > > > > > > >> us,
> > > > > > > > > > > >> > > not because it offers a better experience for
> > > > Connector
> > > > > > > > > > developers.
> > > > > > > > > > > >> IMO,
> > > > > > > > > > > >> > if
> > > > > > > > > > > >> > > there's a practical alternative that uses normal
> > > > > > development
> > > > > > > > > > > practices
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > techniques, then we should use that alternative.
> > > IIUC,
> > > > > > there
> > > > > > > > is
> > > > > > > > > at
> > > > > > > > > > > >> least
> > > > > > > > > > > >> > > one practical alternative for this KIP that
> would
> > > not
> > > > > > > require
> > > > > > > > > > > >> developers
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> > > use the unusual try-catch to handle the case
> where
> > > > > methods
> > > > > > > are
> > > > > > > > > not
> > > > > > > > > > > >> found.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > I also think having two `put` methods is easier
> > when
> > > > the
> > > > > > > > > Connector
> > > > > > > > > > > >> has to
> > > > > > > > > > > >> > > do different things for different Connect
> > runtimes,
> > > > too.
> > > > > > One
> > > > > > > > of
> > > > > > > > > > > those
> > > > > > > > > > > >> > > methods is called by newer Connect runtimes with
> > the
> > > > new
> > > > > > > > > behavior,
> > > > > > > > > > > and
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > other method is called by an older Connect
> > runtime.
> > > Of
> > > > > > > course,
> > > > > > > > > if
> > > > > > > > > > > the
> > > > > > > > > > > >> > > developer does not need separate methods, they
> can
> > > > > easily
> > > > > > > have
> > > > > > > > > the
> > > > > > > > > > > >> older
> > > > > > > > > > > >> > > `put` method simply delegate to the newer
> method.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> *eventually*
> > > > > > > deprecating
> > > > > > > > > the
> > > > > > > > > > > >> older
> > > > > > > > > > > >> > > method, once the Connect runtime versions
> without
> > > this
> > > > > > > change
> > > > > > > > > are
> > > > > > > > > > > old
> > > > > > > > > > > >> > > enough.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > I think the advantage of going with the
> > > > > > > > > > > >> > > > proposed approach in the KIP is that it
> wouldn't
> > > > > require
> > > > > > > > extra
> > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your
> > > proposed
> > > > > > > > approach)
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > The connector does have to do some of this
> > > bookkeeping
> > > > > in
> > > > > > > how
> > > > > > > > > they
> > > > > > > > > > > >> track
> > > > > > > > > > > >> > > the topic partition offsets used in the
> > `preCommit`,
> > > > and
> > > > > > the
> > > > > > > > > > > >> pre-commit
> > > > > > > > > > > >> > > methods use the same `Map<TopicPartition,
> > > > > > OffsetAndMetadata>
> > > > > > > > > > > >> > > currentOffsets`
> > > > > > > > > > > >> > > data structure I'm suggesting be used.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > I hope that helps.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Best regards,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Randall
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > > Hi Randall,
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Thanks for reviewing the KIP!
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > That latter logic can get quite ugly.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I'm not sure I quite understand why it would
> be
> > > > > "easier"
> > > > > > > for
> > > > > > > > > > > >> connector
> > > > > > > > > > > >> > > > developers to account for implementing two
> > > different
> > > > > > > > > overloaded
> > > > > > > > > > > >> `put`
> > > > > > > > > > > >> > > > methods (assuming that they want to use this
> new
> > > > > > feature)
> > > > > > > > > versus
> > > > > > > > > > > >> using
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access
> > > methods?
> > > > In
> > > > > > > both
> > > > > > > > > > > cases, a
> > > > > > > > > > > >> > > > connector developer would need to write
> > additional
> > > > > code
> > > > > > in
> > > > > > > > > order
> > > > > > > > > > > to
> > > > > > > > > > > >> > > ensure
> > > > > > > > > > > >> > > > that their connector continues working with
> > older
> > > > > > Connect
> > > > > > > > > > > runtimes.
> > > > > > > > > > > >> > > > Furthermore, we would probably need to
> carefully
> > > > > > document
> > > > > > > > how
> > > > > > > > > > the
> > > > > > > > > > > >> > > > implementation for the older `put` method
> should
> > > > look
> > > > > > like
> > > > > > > > for
> > > > > > > > > > > >> > connectors
> > > > > > > > > > > >> > > > that want to use this new feature. I think the
> > > > > advantage
> > > > > > > of
> > > > > > > > > > going
> > > > > > > > > > > >> with
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > proposed approach in the KIP is that it
> wouldn't
> > > > > require
> > > > > > > > extra
> > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your
> > > proposed
> > > > > > > > approach)
> > > > > > > > > > and
> > > > > > > > > > > >> also
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > fact that the try-catch based logic is an
> > already
> > > > > > > > established
> > > > > > > > > > > >> pattern
> > > > > > > > > > > >> > > > through
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > > > > > > > >> > > > and other KIPs which added methods to
> > source/sink
> > > > > > > > > connector/task
> > > > > > > > > > > >> > > contexts.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Let me know if you still feel that having a
> new
> > > > > > overloaded
> > > > > > > > put
> > > > > > > > > > > >> method
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> > > a
> > > > > > > > > > > >> > > > cleaner solution and I'd be happy to
> reconsider!
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > >> > > > Yash
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall
> Hauch <
> > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and
> > > > > > discussion.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > The KIP includes this rejected alternative:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to pass
> > the
> > > > new
> > > > > > > > > > information
> > > > > > > > > > > >> > outside
> > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived class)
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >    -
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >    Much more disruptive change without
> > > > > considerable
> > > > > > > pros
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > One advantage about doing this is that sink
> > > > > connector
> > > > > > > > > > > >> implementations
> > > > > > > > > > > >> > > can
> > > > > > > > > > > >> > > > > more easily implement two different
> "put(...)"
> > > > > methods
> > > > > > > to
> > > > > > > > > > handle
> > > > > > > > > > > >> > > running
> > > > > > > > > > > >> > > > in
> > > > > > > > > > > >> > > > > a variety of runtimes, without having to use
> > > > > try-catch
> > > > > > > > logic
> > > > > > > > > > > >> around
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > > newer SinkRecord access methods. That latter
> > > logic
> > > > > can
> > > > > > > get
> > > > > > > > > > quite
> > > > > > > > > > > >> > ugly.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > For example, the existing `put` method has
> > this
> > > > > > > signature:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > public abstract void
> > put(Collection<SinkRecord>
> > > > > > > records);
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > If we added an overloaded method that passed
> > in
> > > a
> > > > > map
> > > > > > of
> > > > > > > > the
> > > > > > > > > > old
> > > > > > > > > > > >> > > > > topic+partition for each record (and defined
> > the
> > > > > > absence
> > > > > > > > of
> > > > > > > > > an
> > > > > > > > > > > >> entry
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> > > > > having an unchanged topic and partition):
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > records,
> > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > > > > > > > > >> > > > > put(records);
> > > > > > > > > > > >> > > > > }
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > then a `SinkTask` implementation that wants
> to
> > > use
> > > > > > this
> > > > > > > > new
> > > > > > > > > > > >> feature
> > > > > > > > > > > >> > > could
> > > > > > > > > > > >> > > > > simply implement both methods:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > records)
> > > {
> > > > > > > > > > > >> > > > > // Running in an older runtime, so no
> tracking
> > > of
> > > > > > > > > SMT-modified
> > > > > > > > > > > >> topic
> > > > > > > > > > > >> > > > names
> > > > > > > > > > > >> > > > > or partitions
> > > > > > > > > > > >> > > > > put(records, Map.of());
> > > > > > > > > > > >> > > > > }
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > records,
> > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > > > > > > > > >> > > > > // real logic here
> > > > > > > > > > > >> > > > > }
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > This seems a lot easier than having to use
> > > > try-catch
> > > > > > > > logic,
> > > > > > > > > > yet
> > > > > > > > > > > >> still
> > > > > > > > > > > >> > > > > allows sink connectors to utilize the new
> > > > > > functionality
> > > > > > > > and
> > > > > > > > > > > still
> > > > > > > > > > > >> > work
> > > > > > > > > > > >> > > > with
> > > > > > > > > > > >> > > > > older Connect runtimes.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > WDYT?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Randall
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <
> > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > Hi all,
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > I would like to (re)start a new discussion
> > > > thread
> > > > > on
> > > > > > > > > KIP-793
> > > > > > > > > > > >> (Kafka
> > > > > > > > > > > >> > > > > > Connect) which proposes some additions to
> > the
> > > > > public
> > > > > > > > > > > SinkRecord
> > > > > > > > > > > >> > > > interface
> > > > > > > > > > > >> > > > > > in order to support topic mutating SMTs
> for
> > > sink
> > > > > > > > > connectors
> > > > > > > > > > > >> that do
> > > > > > > > > > > >> > > > their
> > > > > > > > > > > >> > > > > > own offset tracking.
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Links:
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > KIP:
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Older discussion thread:
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >>
> > > > > > >
> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h
> > ,
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >>
> > > > > > >
> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Jira:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > >> > > > > > Yash
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to