Hi Yash, I think the use case for pre-transform TPO coordinates (and topic partition writers created/destroyed in close/open) tends to boil down to exactly-once semantics, where it's desirable to preserve the guarantees that Kafka provides (every record has a unique TPO trio, and records are ordered by offset within a topic partition).
It's my understanding that this approach is utilized in several connectors out there today, and it might break these connectors to start using the post-transform topic partitions automatically in their open/close methods. If we want to get really fancy with this and try to obviate or at least reduce the need for per-connector code changes, we might try to introduce a framework-level configuration property to dictate which of the pre-transform and post-transform topic partitions are used for the fallback call to the single-arg variant if a task class has not overridden the multi-arg variant. But I think this is going a bit too far and would prefer to keep things simple(r) for now. Cheers, Chris On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Chris, > > > I was actually envisioning something like `void > > open(Collection<TopicPartition> originalPartitions, > > Collection<TopicPartition> transformedPartitions)` > > Ah okay, this does make a lot more sense. Sorry, I think I misunderstood > you earlier. I do agree with you that this seems better than splitting it > off into two new sets of open / close methods from a complexity standpoint. > > > Plus, if a connector is intentionally designed to use > > pre-transformation topic partitions in its open/close > > methods, wouldn't we just be trading one form of the > > problem for another by making this switch? > > On thinking about this a bit more, I'm not so convinced that we need to > expose the pre-transform / original topic partitions in the new open / > close methods. The purpose of the open / close methods is to allow sink > tasks to allocate and deallocate resources for each topic partition > assigned to the task and the purpose of topic-mutating SMTs is to > essentially modify the source topic name from the point of view of the sink > connector. Why would a sink connector ever need to or want to allocate > resources for pre-transform topic partitions? Is the argument here that > since we'll be exposing both the pre-transform and post-transform topic > partitions per record, we should also expose the same info via open / close > and allow sink connector implementations to disregard topic-mutating SMTs > completely if they wanted to? > > Either way, I've gone ahead and updated the KIP to reflect all of > our previous discussion here since it had become quite outdated. I've also > updated the KIP title from "Sink Connectors: Support topic-mutating SMTs > for async connectors (preCommit users)" to "Allow sink connectors to be > used with topic-mutating SMTs" since the improvements to the open / close > mechanism doesn't pertain only to asynchronous sink connectors. The new KIP > URL is: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs > > > Thanks, > Yash > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Yash, > > > > I was actually envisioning something like `void > > open(Collection<TopicPartition> > > originalPartitions, Collection<TopicPartition> transformedPartitions)`, > > since we already convert and transform each batch of records that we poll > > from the sink task's consumer en masse, meaning we could discover several > > new transformed partitions in between consecutive calls to SinkTask::put. > > > > It's also worth noting that we'll probably want to deprecate the existing > > open/close methods, at which point keeping one non-deprecated variant of > > each seems more appealing and less complex than keeping two. > > > > Honestly though, I think we're both on the same page enough that I > wouldn't > > object to either approach. We've probably reached the saturation point > for > > ROI here and as long as we provide developers a way to get the > information > > they need from the runtime and take care to add Javadocs and update our > > docs page (possibly including the connector development quickstart), it > > should be fine. > > > > At this point, it might be worth updating the KIP based on recent > > discussion so that others can see the latest proposal, and we can both > take > > a look and make sure everything looks good enough before opening a vote > > thread. > > > > Finally, I think you make a convincing case for a time-based eviction > > policy. I wasn't thinking about the fairly common SMT pattern of > deriving a > > topic name from, e.g., a record field or header. > > > > Cheers, > > > > Chris > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <yash.ma...@gmail.com> > wrote: > > > > > Hi Chris, > > > > > > > Plus, if a connector is intentionally designed to > > > > use pre-transformation topic partitions in its > > > > open/close methods, wouldn't we just be trading > > > > one form of the problem for another by making this > > > > switch? > > > > > > Thanks, this makes sense, and given that the KIP already proposes a way > > for > > > sink connector implementations to distinguish between pre-transform and > > > post-transform topics per record, I think I'm convinced that going with > > new > > > `open()` / `close()` methods is the right approach. However, I still > feel > > > like having overloaded methods will make it a lot less unintuitive > given > > > that the two sets of methods would be different in terms of when > they're > > > called and what arguments they are passed (also I'm presuming that the > > > overloaded methods you're prescribing will only have a single > > > `TopicPartition` rather than a `Collection<TopicPartition>` as their > > > parameters). I guess my concern is largely around the fact that it > won't > > be > > > possible to distinguish between the overloaded methods' use cases just > > from > > > the method signatures. I agree that naming is going to be difficult > here, > > > but I think that having two sets of `SinkTask::openXyz` / > > > `SinkTask::closeXyz` methods will be less complicated to understand > from > > a > > > connector developer perspective (as compared to overloaded methods with > > > only differing documentation). Of your suggested options, I think > > > `openPreTransform` / `openPostTransform` are the most comprehensible > > ones. > > > > > > > BTW, I wouldn't say that we can't make assumptions > > > > about the relationships between pre- and post-transformation > > > > topic partitions. > > > > > > I meant that the framework wouldn't be able to deterministically know > > when > > > to close a post-transform topic partition given that SMTs could use > > > per-record data / metadata to manipulate the topic names as and how > > > required (which supports the suggestion to use an eviction policy based > > > mechanism to call SinkTask::close for post-transform topic partitions). > > > > > > > We might utilize a policy that assumes a deterministic > > > > mapping from the former to the latter, for example. > > > > > > Wouldn't this be making the assumption that SMTs only use the topic > name > > > itself and no other data / metadata while computing the new topic name? > > Are > > > you suggesting that since this assumption could work for a majority of > > > SMTs, it might be more efficient overall in terms of reducing the > number > > of > > > "false-positive" calls to `SinkTask::closePostTransform` (and we'll > also > > be > > > able to call `SinkTask::closePostTransform` immediately after topic > > > partitions are revoked from the consumer)? I was thinking something > more > > > generic along the lines of a simple time based eviction policy that > > > wouldn't be making any assumptions regarding the SMT implementations. > > > Either way, I do like your earlier suggestion of keeping this logic > > > internal and not painting ourselves into a corner by promising any > > > particular behavior in the KIP. > > > > > > Thanks, > > > Yash > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton <chr...@aiven.io.invalid > > > > > wrote: > > > > > > > Hi Yash, > > > > > > > > I think the key difference between adding methods/overloads related > to > > > > SinkTask::open/SinkTask::close and SinkTask::put is that this isn't > > > > auxiliary information that may or may not be useful to connector > > > > developers. It's actually critical for them to understand the > > difference > > > > between the two concepts here, even if they look very similar. And > > yes, I > > > > do believe that switching from pre-transform to post-transform topic > > > > partitions is too big a change in behavior here. Plus, if a connector > > is > > > > intentionally designed to use pre-transformation topic partitions in > > its > > > > open/close methods, wouldn't we just be trading one form of the > problem > > > for > > > > another by making this switch? > > > > > > > > One possible alternative to overloading the existing methods is to > > split > > > > SinkTask::open into openOriginal (or possibly openPhysical or > > > > openPreTransform) and openTransformed (or openLogical or > > > > openPostTransform), with a similar change for SinkTask::close. The > > > default > > > > implementation for SinkTask::openOriginal can be to call > > SinkTask::open, > > > > and the same can go for SinkTask::close. However, I prefer > overloading > > > the > > > > existing methods since this alternative increases complexity and none > > of > > > > the names are very informative. > > > > > > > > BTW, I wouldn't say that we can't make assumptions about the > > > relationships > > > > between pre- and post-transformation topic partitions. We might > > utilize a > > > > policy that assumes a deterministic mapping from the former to the > > > latter, > > > > for example. The distinction I'd draw is that the assumptions we make > > can > > > > and probably should favor some cases in terms of performance (i.e., > > > > reducing the number of unnecessary calls to close/open over a given > > sink > > > > task's lifetime), but should not lead to guaranteed resource leaks or > > > > failure to obey API contract in any cases. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <yash.ma...@gmail.com> > > > wrote: > > > > > > > > > Hi Chris, > > > > > > > > > > > especially if connectors are intentionally designed around > > > > > > original topic partitions instead of transformed ones. > > > > > > > > > > Ha, that's a good point and reminds me of Hyrum's Law [1] :) > > > > > > > > > > > I think we have to provide connector developers with some > > > > > > way to differentiate between the two, but maybe there's a way > > > > > > to do this that I haven't thought of yet > > > > > > > > > > I can't think of a better way to do this either; would invoking the > > > > > existing `SinkTask::open` and `SinkTask::close` methods with > > > > post-transform > > > > > topic partitions instead of pre-transform topic partitions not be > > > > > acceptable even in a minor / major AK release? I feel like the > > proposed > > > > > approach of adding overloaded `SinkTask::open` / `SinkTask::close` > > > > methods > > > > > to differentiate between pre-transform and post-transform topic > > > > partitions > > > > > has similar pitfalls to the idea of the overloaded `SinkTask::put` > > > method > > > > > we discarded earlier. > > > > > > > > > > > Either way, I'm glad that the general idea of a cache and > > > > > > eviction policy for SinkTask::close seem reasonable; if > > > > > > we decide to go this route, it might make sense for the KIP > > > > > > to include an outline of one or more high-level strategies > > > > > > we might take, but without promising any particular behavior > > > > > > beyond occasionally calling SinkTask::close for post-transform > > > > > > topic partitions. I'm hoping that this logic can stay internal, > > > > > > and by notpainting ourselves into a corner with the KIP, we > > > > > > give ourselves leeway to tweak it in the future if necessary > > > > > > without filing another KIP or introducing a pluggable interface. > > > > > > > > > > Thanks, that's a good idea. Given the flexibility of SMTs, the > > > framework > > > > > can't really make any assumptions around topic partitions post > > > > > transformation nor does it have any way to definitively get any > such > > > > > information from transformations which is why the idea of a cache > > with > > > an > > > > > eviction policy makes perfect sense! > > > > > > > > > > [1] - https://www.hyrumslaw.com/ > > > > > > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton > <chr...@aiven.io.invalid > > > > > > > > wrote: > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > So it looks like with the current state of affairs, sink tasks > > that > > > > > only > > > > > > instantiate writers in the SinkTask::open method (and don't do > the > > > lazy > > > > > > instantiation in SinkTask::put that you mentioned) might fail > when > > > used > > > > > > with topic/partition mutating SMTs even if they don't do any > > > > asynchronous > > > > > > processing? > > > > > > > > > > > > Yep, exactly 👍 > > > > > > > > > > > > > What do you think about retaining just the existing methods > > > > > > but changing when they're called in the Connect runtime? For > > > instance, > > > > > > instead of calling SinkTask::open after partition assignment > post a > > > > > > consumer group rebalance, we could cache the currently "seen" > topic > > > > > > partitions (post transformation) and before each call to > > > SinkTask::put > > > > > > check whether there's any new "unseen" topic partitions, and if > so > > > call > > > > > > SinkTask::open (and also update the cache of course). > > > > > > > > > > > > IMO the issue here is that it's a drastic change in behavior to > > start > > > > > > invoking SinkTask::open and SinkTask::close with post-transform > > topic > > > > > > partitions instead of pre-transform, especially if connectors are > > > > > > intentionally designed around original topic partitions instead > of > > > > > > transformed ones. I think we have to provide connector developers > > > with > > > > > some > > > > > > way to differentiate between the two, but maybe there's a way to > do > > > > this > > > > > > that I haven't thought of yet. Interested to hear your thoughts. > > > > > > > > > > > > Either way, I'm glad that the general idea of a cache and > eviction > > > > policy > > > > > > for SinkTask::close seem reasonable; if we decide to go this > route, > > > it > > > > > > might make sense for the KIP to include an outline of one or more > > > > > > high-level strategies we might take, but without promising any > > > > particular > > > > > > behavior beyond occasionally calling SinkTask::close for > > > post-transform > > > > > > topic partitions. I'm hoping that this logic can stay internal, > and > > > by > > > > > not > > > > > > painting ourselves into a corner with the KIP, we give ourselves > > > leeway > > > > > to > > > > > > tweak it in the future if necessary without filing another KIP or > > > > > > introducing a pluggable interface. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Chris > > > > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <yash.ma...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > 1) That's a fair point; while I did scan everything publicly > > > > available > > > > > on > > > > > > > GitHub, you're right in that it won't cover all possible SMTs > > that > > > > are > > > > > > out > > > > > > > there. Thanks for the example use-case as well, I've updated > the > > > KIP > > > > to > > > > > > add > > > > > > > the two new proposed methods. > > > > > > > > > > > > > > 2) So it looks like with the current state of affairs, sink > tasks > > > > that > > > > > > only > > > > > > > instantiate writers in the SinkTask::open method (and don't do > > the > > > > lazy > > > > > > > instantiation in SinkTask::put that you mentioned) might fail > > when > > > > used > > > > > > > with topic/partition mutating SMTs even if they don't do any > > > > > asynchronous > > > > > > > processing? Since they could encounter records in SinkTask::put > > > with > > > > > > > topics/partitions that they might not have created writers for. > > > > Thanks > > > > > > for > > > > > > > pointing this out, it's definitely another incompatibility that > > > needs > > > > > to > > > > > > be > > > > > > > called out and fixed. The overloaded method approach is > > > interesting, > > > > > but > > > > > > > comes with the caveat of yet more new methods that will need to > > be > > > > > > > implemented by existing connectors if they want to make use of > > this > > > > new > > > > > > > functionality. What do you think about retaining just the > > existing > > > > > > methods > > > > > > > but changing when they're called in the Connect runtime? For > > > > instance, > > > > > > > instead of calling SinkTask::open after partition assignment > > post a > > > > > > > consumer group rebalance, we could cache the currently "seen" > > topic > > > > > > > partitions (post transformation) and before each call to > > > > SinkTask::put > > > > > > > check whether there's any new "unseen" topic partitions, and if > > so > > > > call > > > > > > > SinkTask::open (and also update the cache of course). I don't > > think > > > > > this > > > > > > > would break the existing contract with sink tasks where > > > > SinkTask::open > > > > > is > > > > > > > expected to be called for a topic partition before any records > > from > > > > the > > > > > > > topic partition are sent via SinkTask::put? The SinkTask::close > > > case > > > > > is a > > > > > > > lot trickier however, and would require some sort of cache > > eviction > > > > > > policy > > > > > > > that would be deemed appropriate as you pointed out too. > > > > > > > > > > > > > > Thanks, > > > > > > > Yash > > > > > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > I've had some time to think on this KIP and I think I'm in > > > > agreement > > > > > > > about > > > > > > > > not blocking it on an official compatibility library or > adding > > > the > > > > > > "ack" > > > > > > > > API for sink records. > > > > > > > > > > > > > > > > I only have two more thoughts: > > > > > > > > > > > > > > > > 1. Because it is possible to manipulate sink record > partitions > > > and > > > > > > > offsets > > > > > > > > with the current API we provide for transformations, I still > > > > believe > > > > > > > > methods should be added to the SinkRecord class to expose the > > > > > original > > > > > > > > partition and offset, not just the original topic. The > > additional > > > > > > > cognitive > > > > > > > > burden from these two methods is going to be minimal anyways; > > > once > > > > > > users > > > > > > > > understand the difference between the transformed topic name > > and > > > > the > > > > > > > > original one, it's going to be trivial for them to understand > > how > > > > > that > > > > > > > same > > > > > > > > difference applies for partitions and offsets. It's not > enough > > to > > > > > scan > > > > > > > the > > > > > > > > set of SMTs provided out of the box with Connect, ones > > developed > > > by > > > > > > > > Confluent, or even everything available on GitHub, since > there > > > may > > > > be > > > > > > > > closed-source projects out there that rely on this ability. > One > > > > > > potential > > > > > > > > use case could be re-routing partitions between Kafka and > some > > > > other > > > > > > > > sharded system. > > > > > > > > > > > > > > > > 2. We still have to address the SinkTask::open [1] and > > > > > SinkTask::close > > > > > > > [2] > > > > > > > > methods. If a connector writes to the external system using > the > > > > > > > transformed > > > > > > > > topic partitions it reads from Kafka, then it's possible for > > the > > > > > > > connector > > > > > > > > to lazily instantiate writers for topic partitions as it > > > encounters > > > > > > them > > > > > > > > from records provided in SinkTask::put. However, connectors > > also > > > > > need a > > > > > > > way > > > > > > > > to de-allocate those writers (and the resources used by them) > > > over > > > > > > time, > > > > > > > > which they can't do as easily. One possible approach here is > to > > > > > > overload > > > > > > > > SinkTask::open and SinkTask::close with variants that > > distinguish > > > > > > between > > > > > > > > transformed and original topic partitions, and default to > > > invoking > > > > > the > > > > > > > > existing methods with just the original topic partitions. We > > > would > > > > > then > > > > > > > > have several options for how the Connect runtime can invoke > > these > > > > > > > methods, > > > > > > > > but in general, an approach that guarantees that tasks are > > > notified > > > > > of > > > > > > > > transformed topic partitions in SinkTask::open before any > > records > > > > for > > > > > > > that > > > > > > > > partition are given to it in SinkTask::put, and makes a > > > best-effort > > > > > > > attempt > > > > > > > > to close transformed topic partitions that appear to no > longer > > be > > > > in > > > > > > use > > > > > > > > based on some eviction policy, would probably be sufficient. > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection) > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection) > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya < > > yash.ma...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > Thanks a lot for your inputs! > > > > > > > > > > > > > > > > > > > would provide a simple, clean interface for developers to > > > > > determine > > > > > > > > > > which features are supported by the version of the > Connect > > > > > runtime > > > > > > > > > > that their plugin has been deployed onto > > > > > > > > > > > > > > > > > > I do like the idea of having such a public compatibility > > > library > > > > - > > > > > I > > > > > > > > think > > > > > > > > > it would remove a lot of restrictions from framework > > > development > > > > if > > > > > > it > > > > > > > > were > > > > > > > > > to be widely adopted. > > > > > > > > > > > > > > > > > > > we might consider adding an API to "ack" sink records > > > > > > > > > > > > > > > > > > I agree that this does seem like a more intuitive and clean > > > API, > > > > > but > > > > > > > I'm > > > > > > > > > concerned about the backward compatibility headache we'd be > > > > > imposing > > > > > > on > > > > > > > > all > > > > > > > > > existing sink connectors. Connector developers will have to > > > > > maintain > > > > > > > two > > > > > > > > > separate ways of doing offset management if they want to > use > > > the > > > > > new > > > > > > > API > > > > > > > > > but continue supporting older versions of Kafka Connect. > > > > > > > > > > > > > > > > > > For now, I've reverted the KIP to the previous iteration > > which > > > > > > proposed > > > > > > > > the > > > > > > > > > addition of a new `SinkRecord` method to obtain the > original > > > > Kafka > > > > > > > topic > > > > > > > > > pre-transformation. One thing to note is that I've removed > > the > > > > > method > > > > > > > for > > > > > > > > > obtaining the original Kafka partition after a cursory > search > > > > > showed > > > > > > > that > > > > > > > > > use cases for partition modifying SMTs are primarily on the > > > > source > > > > > > > > > connector side. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > I have more comments I'd like to make on this KIP when I > > have > > > > > time > > > > > > > > (sorry > > > > > > > > > > for the delay, Yash, and thanks for your patience!), but > I > > > did > > > > > want > > > > > > > to > > > > > > > > > > chime in and say that I'm also not sure about overloading > > > > > > > > SinkTask::put. > > > > > > > > > I > > > > > > > > > > share the concerns about creating an intuitive, simple > API > > > that > > > > > > Yash > > > > > > > > has > > > > > > > > > > raised. In addition, this approach doesn't seem very > > > > > > > sustainable--what > > > > > > > > do > > > > > > > > > > we do if we encounter another case in the future that > would > > > > > > warrant a > > > > > > > > > > similar solution? We probably don't want to create three, > > > four, > > > > > > etc. > > > > > > > > > > overloaded variants of the method, each of which would > have > > > to > > > > be > > > > > > > > > > implemented by connector developers who want to both > > leverage > > > > the > > > > > > > > latest > > > > > > > > > > and greatest connector APIs and maintain compatibility > with > > > > > connect > > > > > > > > > > Clusters running older versions. > > > > > > > > > > > > > > > > > > > > I haven't been able to flesh this out into a design worth > > > > > > publishing > > > > > > > in > > > > > > > > > its > > > > > > > > > > own KIP yet, but one alternative I've pitched to a few > > people > > > > > with > > > > > > > > > > generally positive interest has been to develop an > official > > > > > > > > compatibility > > > > > > > > > > library for Connect developers. This library would be > > > released > > > > as > > > > > > its > > > > > > > > own > > > > > > > > > > Maven artifact (separate from connect-api, > connect-runtime, > > > > etc.) > > > > > > and > > > > > > > > > would > > > > > > > > > > provide a simple, clean interface for developers to > > determine > > > > > which > > > > > > > > > > features are supported by the version of the Connect > > runtime > > > > that > > > > > > > their > > > > > > > > > > plugin has been deployed onto. Under the hood, this > library > > > > might > > > > > > use > > > > > > > > > > reflection to determine whether classes, methods, etc. > are > > > > > > available, > > > > > > > > but > > > > > > > > > > the developer wouldn't have to do anything more than > check > > > (for > > > > > > > > example) > > > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to > > know > > > > at > > > > > > any > > > > > > > > > point > > > > > > > > > > in the lifetime of their connector/task whether that > > feature > > > is > > > > > > > > provided > > > > > > > > > by > > > > > > > > > > the runtime. > > > > > > > > > > > > > > > > > > > > One other high-level comment: this doesn't address every > > > case, > > > > > but > > > > > > we > > > > > > > > > might > > > > > > > > > > consider adding an API to "ack" sink records. This could > > use > > > > the > > > > > > > > > > SubmittedRecords class [1] (with some slight tweaks) > under > > > the > > > > > hood > > > > > > > to > > > > > > > > > > track the latest-acked offset for each topic partition. > > This > > > > way, > > > > > > > > > connector > > > > > > > > > > developers won't be responsible for tracking offsets at > all > > > in > > > > > > their > > > > > > > > sink > > > > > > > > > > tasks (eliminating issues with the accuracy of > > > > > post-transformation > > > > > > > > T/P/O > > > > > > > > > > sink record information), and they'll only have to notify > > the > > > > > > Connect > > > > > > > > > > framework when a record has been successfully dispatched > to > > > the > > > > > > > > external > > > > > > > > > > system. This provides a cleaner, friendlier API, and also > > > > enables > > > > > > > more > > > > > > > > > > fine-grained metrics like the ones proposed in KIP-767 > [2]. > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya < > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > It's been a while for this one but the more I think > about > > > it, > > > > > the > > > > > > > > more > > > > > > > > > I > > > > > > > > > > > feel like the current approach with a new overloaded > > > > > > > `SinkTask::put` > > > > > > > > > > method > > > > > > > > > > > might not be optimal. We're trying to fix a pretty > corner > > > > case > > > > > > bug > > > > > > > > here > > > > > > > > > > > (usage of topic mutating SMTs with sink connectors that > > do > > > > > their > > > > > > > own > > > > > > > > > > offset > > > > > > > > > > > tracking) and I'm not sure that warrants a change to > > such a > > > > > > central > > > > > > > > > > > interface method. The new `SinkTask::put` method just > > seems > > > > > > > somewhat > > > > > > > > > odd > > > > > > > > > > > and it may not be very understandable for a new reader > - > > I > > > > > don't > > > > > > > > think > > > > > > > > > > this > > > > > > > > > > > should be the case for a public interface method. > > > > Furthermore, > > > > > > even > > > > > > > > > with > > > > > > > > > > > elaborate documentation in place, I'm not sure if it'll > > be > > > > very > > > > > > > > obvious > > > > > > > > > > to > > > > > > > > > > > most people what the purpose of having these two `put` > > > > methods > > > > > is > > > > > > > and > > > > > > > > > how > > > > > > > > > > > they should be used by sink task implementations. What > do > > > you > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya < > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable feedback so far! I've > > > > updated > > > > > > the > > > > > > > > KIP > > > > > > > > > > > based > > > > > > > > > > > > on our discussion above. Could you please take > another > > > > look? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch < > > > > > > rha...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya < > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Randall, > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks for elaborating. I think these are all very > > > good > > > > > > points > > > > > > > > > and I > > > > > > > > > > > see > > > > > > > > > > > >> > why the overloaded `SinkTask::put` method is a > > cleaner > > > > > > > solution > > > > > > > > > > > overall. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > public void put(Collection<SinkRecord> records, > > > > > > > > Map<SinkRecord, > > > > > > > > > > > >> > TopicPartition> updatedTopicPartitions) > > > > > > > > > > > >> > > > > > > > > > > > > >> > I think this should be > > > > > > > > > > > >> > > > > > > > > > > > > >> > `public void put(Collection<SinkRecord> records, > > > > > > > Map<SinkRecord, > > > > > > > > > > > >> > TopicPartition> originalTopicPartitions)` > > > > > > > > > > > >> > > > > > > > > > > > > >> > instead because the sink records themselves have > the > > > > > updated > > > > > > > > topic > > > > > > > > > > > >> > partitions (i.e. after all transformations have > been > > > > > > applied) > > > > > > > > and > > > > > > > > > > the > > > > > > > > > > > >> KIP > > > > > > > > > > > >> > is proposing a way for the tasks to be able to > > access > > > > the > > > > > > > > original > > > > > > > > > > > topic > > > > > > > > > > > >> > partition (i.e. before transformations have been > > > > applied). > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> Sounds good. > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > Of course, if the developer does not need > separate > > > > > > methods, > > > > > > > > they > > > > > > > > > > can > > > > > > > > > > > >> > easily have the older `put` method simply delegate > > to > > > > the > > > > > > > newer > > > > > > > > > > > method. > > > > > > > > > > > >> > > > > > > > > > > > > >> > If the developer does not need separate methods > > (i.e. > > > > they > > > > > > > don't > > > > > > > > > > need > > > > > > > > > > > to > > > > > > > > > > > >> > use this new addition), they can simply continue > > > > > > implementing > > > > > > > > just > > > > > > > > > > the > > > > > > > > > > > >> > older `put` method right? > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> Correct. We should update the JavaDoc of both > methods > > to > > > > > make > > > > > > > this > > > > > > > > > > > clear, > > > > > > > > > > > >> and in general how the two methods should are used > and > > > > > should > > > > > > be > > > > > > > > > > > >> implemented. That can be part of the PR, and the KIP > > > > doesn't > > > > > > > need > > > > > > > > > this > > > > > > > > > > > >> wording. > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for > *eventually* > > > > > > > deprecating > > > > > > > > > the > > > > > > > > > > > >> older > > > > > > > > > > > >> > method, once the Connect runtime versions without > > this > > > > > > change > > > > > > > > are > > > > > > > > > > old > > > > > > > > > > > >> > enough. > > > > > > > > > > > >> > > > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate the older > > > > method. > > > > > > > Most > > > > > > > > > > common > > > > > > > > > > > >> sink > > > > > > > > > > > >> > connector implementations do not do their own > offset > > > > > > tracking > > > > > > > > with > > > > > > > > > > > >> > asynchronous processing and will probably never > > have a > > > > > need > > > > > > > for > > > > > > > > > the > > > > > > > > > > > >> > additional parameter `Map<SinkRecord, > > TopicPartition> > > > > > > > > > > > >> > originalTopicPartitions` in the proposed new `put` > > > > method. > > > > > > > These > > > > > > > > > > > >> connectors > > > > > > > > > > > >> > can continue implementing only the existing > > > > > `SinkTask::put` > > > > > > > > method > > > > > > > > > > > which > > > > > > > > > > > >> > will be called by the default implementation of > the > > > > newer > > > > > > > > > overloaded > > > > > > > > > > > >> `put` > > > > > > > > > > > >> > method. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> +1 > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > the pre-commit methods use the same > > > > `Map<TopicPartition, > > > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data structure > > I'm > > > > > > > suggesting > > > > > > > > > be > > > > > > > > > > > >> used. > > > > > > > > > > > >> > > > > > > > > > > > > >> > The data structure you're suggesting be used is a > > > > > > > > `Map<SinkRecord, > > > > > > > > > > > >> > TopicPartition>` which will map `SinkRecord` > objects > > > to > > > > > the > > > > > > > > > original > > > > > > > > > > > >> topic > > > > > > > > > > > >> > partition of the corresponding `ConsumerRecord` > > right? > > > > To > > > > > > > > clarify, > > > > > > > > > > > this > > > > > > > > > > > >> is > > > > > > > > > > > >> > a new data structure that will need to be managed > in > > > the > > > > > > > > > > > >> `WorkerSinkTask`. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> Ah, you're right. Thanks for the correction. > > > > > > > > > > > >> > > > > > > > > > > > >> Best regards, > > > > > > > > > > > >> Randall > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > >> > Yash > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch < > > > > > > > rha...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > >> > > Hi, Yash. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > I'm not sure I quite understand why it would be > > > > "easier" > > > > > > for > > > > > > > > > > > connector > > > > > > > > > > > >> > > > developers to account for implementing two > > > different > > > > > > > > > overloaded > > > > > > > > > > > >> `put` > > > > > > > > > > > >> > > > methods (assuming that they want to use this > new > > > > > > feature) > > > > > > > > > versus > > > > > > > > > > > >> using > > > > > > > > > > > >> > a > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > > > methods? > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Using a try-catch to try around an API method > that > > > > > *might* > > > > > > > be > > > > > > > > > > there > > > > > > > > > > > >> is a > > > > > > > > > > > >> > > very unusual thing for most developers. > > > Unfortunately, > > > > > > we've > > > > > > > > had > > > > > > > > > > to > > > > > > > > > > > >> > resort > > > > > > > > > > > >> > > to this atypical approach with Connect in places > > > when > > > > > > there > > > > > > > > was > > > > > > > > > no > > > > > > > > > > > >> good > > > > > > > > > > > >> > > alternative. We seem to relying upon pattern > > because > > > > > it's > > > > > > > > easier > > > > > > > > > > for > > > > > > > > > > > >> us, > > > > > > > > > > > >> > > not because it offers a better experience for > > > > Connector > > > > > > > > > > developers. > > > > > > > > > > > >> IMO, > > > > > > > > > > > >> > if > > > > > > > > > > > >> > > there's a practical alternative that uses normal > > > > > > development > > > > > > > > > > > practices > > > > > > > > > > > >> > and > > > > > > > > > > > >> > > techniques, then we should use that alternative. > > > IIUC, > > > > > > there > > > > > > > > is > > > > > > > > > at > > > > > > > > > > > >> least > > > > > > > > > > > >> > > one practical alternative for this KIP that > would > > > not > > > > > > > require > > > > > > > > > > > >> developers > > > > > > > > > > > >> > to > > > > > > > > > > > >> > > use the unusual try-catch to handle the case > where > > > > > methods > > > > > > > are > > > > > > > > > not > > > > > > > > > > > >> found. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > I also think having two `put` methods is easier > > when > > > > the > > > > > > > > > Connector > > > > > > > > > > > >> has to > > > > > > > > > > > >> > > do different things for different Connect > > runtimes, > > > > too. > > > > > > One > > > > > > > > of > > > > > > > > > > > those > > > > > > > > > > > >> > > methods is called by newer Connect runtimes with > > the > > > > new > > > > > > > > > behavior, > > > > > > > > > > > and > > > > > > > > > > > >> > the > > > > > > > > > > > >> > > other method is called by an older Connect > > runtime. > > > Of > > > > > > > course, > > > > > > > > > if > > > > > > > > > > > the > > > > > > > > > > > >> > > developer does not need separate methods, they > can > > > > > easily > > > > > > > have > > > > > > > > > the > > > > > > > > > > > >> older > > > > > > > > > > > >> > > `put` method simply delegate to the newer > method. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for > *eventually* > > > > > > > deprecating > > > > > > > > > the > > > > > > > > > > > >> older > > > > > > > > > > > >> > > method, once the Connect runtime versions > without > > > this > > > > > > > change > > > > > > > > > are > > > > > > > > > > > old > > > > > > > > > > > >> > > enough. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > I think the advantage of going with the > > > > > > > > > > > >> > > > proposed approach in the KIP is that it > wouldn't > > > > > require > > > > > > > > extra > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your > > > proposed > > > > > > > > approach) > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > The connector does have to do some of this > > > bookkeeping > > > > > in > > > > > > > how > > > > > > > > > they > > > > > > > > > > > >> track > > > > > > > > > > > >> > > the topic partition offsets used in the > > `preCommit`, > > > > and > > > > > > the > > > > > > > > > > > >> pre-commit > > > > > > > > > > > >> > > methods use the same `Map<TopicPartition, > > > > > > OffsetAndMetadata> > > > > > > > > > > > >> > > currentOffsets` > > > > > > > > > > > >> > > data structure I'm suggesting be used. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > I hope that helps. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Best regards, > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Randall > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya < > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > >> wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Hi Randall, > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > Thanks for reviewing the KIP! > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > That latter logic can get quite ugly. > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > I'm not sure I quite understand why it would > be > > > > > "easier" > > > > > > > for > > > > > > > > > > > >> connector > > > > > > > > > > > >> > > > developers to account for implementing two > > > different > > > > > > > > > overloaded > > > > > > > > > > > >> `put` > > > > > > > > > > > >> > > > methods (assuming that they want to use this > new > > > > > > feature) > > > > > > > > > versus > > > > > > > > > > > >> using > > > > > > > > > > > >> > a > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > > > methods? > > > > In > > > > > > > both > > > > > > > > > > > cases, a > > > > > > > > > > > >> > > > connector developer would need to write > > additional > > > > > code > > > > > > in > > > > > > > > > order > > > > > > > > > > > to > > > > > > > > > > > >> > > ensure > > > > > > > > > > > >> > > > that their connector continues working with > > older > > > > > > Connect > > > > > > > > > > > runtimes. > > > > > > > > > > > >> > > > Furthermore, we would probably need to > carefully > > > > > > document > > > > > > > > how > > > > > > > > > > the > > > > > > > > > > > >> > > > implementation for the older `put` method > should > > > > look > > > > > > like > > > > > > > > for > > > > > > > > > > > >> > connectors > > > > > > > > > > > >> > > > that want to use this new feature. I think the > > > > > advantage > > > > > > > of > > > > > > > > > > going > > > > > > > > > > > >> with > > > > > > > > > > > >> > > the > > > > > > > > > > > >> > > > proposed approach in the KIP is that it > wouldn't > > > > > require > > > > > > > > extra > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your > > > proposed > > > > > > > > approach) > > > > > > > > > > and > > > > > > > > > > > >> also > > > > > > > > > > > >> > > the > > > > > > > > > > > >> > > > fact that the try-catch based logic is an > > already > > > > > > > > established > > > > > > > > > > > >> pattern > > > > > > > > > > > >> > > > through > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > > > > > > > > > > >> > > > and other KIPs which added methods to > > source/sink > > > > > > > > > connector/task > > > > > > > > > > > >> > > contexts. > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > Let me know if you still feel that having a > new > > > > > > overloaded > > > > > > > > put > > > > > > > > > > > >> method > > > > > > > > > > > >> > is > > > > > > > > > > > >> > > a > > > > > > > > > > > >> > > > cleaner solution and I'd be happy to > reconsider! > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > >> > > > Yash > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall > Hauch < > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > >> > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and > > > > > > discussion. > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > The KIP includes this rejected alternative: > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to pass > > the > > > > new > > > > > > > > > > information > > > > > > > > > > > >> > outside > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived class) > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > - > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Much more disruptive change without > > > > > considerable > > > > > > > pros > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > One advantage about doing this is that sink > > > > > connector > > > > > > > > > > > >> implementations > > > > > > > > > > > >> > > can > > > > > > > > > > > >> > > > > more easily implement two different > "put(...)" > > > > > methods > > > > > > > to > > > > > > > > > > handle > > > > > > > > > > > >> > > running > > > > > > > > > > > >> > > > in > > > > > > > > > > > >> > > > > a variety of runtimes, without having to use > > > > > try-catch > > > > > > > > logic > > > > > > > > > > > >> around > > > > > > > > > > > >> > the > > > > > > > > > > > >> > > > > newer SinkRecord access methods. That latter > > > logic > > > > > can > > > > > > > get > > > > > > > > > > quite > > > > > > > > > > > >> > ugly. > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > For example, the existing `put` method has > > this > > > > > > > signature: > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > public abstract void > > put(Collection<SinkRecord> > > > > > > > records); > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > If we added an overloaded method that passed > > in > > > a > > > > > map > > > > > > of > > > > > > > > the > > > > > > > > > > old > > > > > > > > > > > >> > > > > topic+partition for each record (and defined > > the > > > > > > absence > > > > > > > > of > > > > > > > > > an > > > > > > > > > > > >> entry > > > > > > > > > > > >> > as > > > > > > > > > > > >> > > > > having an unchanged topic and partition): > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > records, > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > > > > > > > >> > > > > put(records); > > > > > > > > > > > >> > > > > } > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > then a `SinkTask` implementation that wants > to > > > use > > > > > > this > > > > > > > > new > > > > > > > > > > > >> feature > > > > > > > > > > > >> > > could > > > > > > > > > > > >> > > > > simply implement both methods: > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > records) > > > { > > > > > > > > > > > >> > > > > // Running in an older runtime, so no > tracking > > > of > > > > > > > > > SMT-modified > > > > > > > > > > > >> topic > > > > > > > > > > > >> > > > names > > > > > > > > > > > >> > > > > or partitions > > > > > > > > > > > >> > > > > put(records, Map.of()); > > > > > > > > > > > >> > > > > } > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > records, > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > > > > > > > >> > > > > // real logic here > > > > > > > > > > > >> > > > > } > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > This seems a lot easier than having to use > > > > try-catch > > > > > > > > logic, > > > > > > > > > > yet > > > > > > > > > > > >> still > > > > > > > > > > > >> > > > > allows sink connectors to utilize the new > > > > > > functionality > > > > > > > > and > > > > > > > > > > > still > > > > > > > > > > > >> > work > > > > > > > > > > > >> > > > with > > > > > > > > > > > >> > > > > older Connect runtimes. > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > WDYT? > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > Randall > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya < > > > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > Hi all, > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > I would like to (re)start a new discussion > > > > thread > > > > > on > > > > > > > > > KIP-793 > > > > > > > > > > > >> (Kafka > > > > > > > > > > > >> > > > > > Connect) which proposes some additions to > > the > > > > > public > > > > > > > > > > > SinkRecord > > > > > > > > > > > >> > > > interface > > > > > > > > > > > >> > > > > > in order to support topic mutating SMTs > for > > > sink > > > > > > > > > connectors > > > > > > > > > > > >> that do > > > > > > > > > > > >> > > > their > > > > > > > > > > > >> > > > > > own offset tracking. > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Links: > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > KIP: > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Older discussion thread: > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h > > , > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Jira: > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431 > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > >> > > > > > Yash > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >