Hi Yash, We'll probably want to make a few tweaks to the Javadocs for the new methods (I'm imagining that notes on compatibility with older versions will be required), but I believe what's proposed in the KIP is good enough to approve with the understanding that it may not exactly match what gets implemented/merged.
LGTM, thanks again for the KIP! Cheers, Chris On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Chris, > > > we might try to introduce a framework-level configuration > > property to dictate which of the pre-transform and post-transform > > topic partitions are used for the fallback call to the single-arg > > variant if a task class has not overridden the multi-arg variant > > Thanks for the explanation and I agree that this will be a tad bit too > convoluted. :) > > Please do let me know if you'd like any further amendments to the KIP! > > Thanks, > Yash > > On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Yash, > > > > I think the use case for pre-transform TPO coordinates (and topic > partition > > writers created/destroyed in close/open) tends to boil down to > exactly-once > > semantics, where it's desirable to preserve the guarantees that Kafka > > provides (every record has a unique TPO trio, and records are ordered by > > offset within a topic partition). > > > > It's my understanding that this approach is utilized in several > connectors > > out there today, and it might break these connectors to start using the > > post-transform topic partitions automatically in their open/close > methods. > > > > If we want to get really fancy with this and try to obviate or at least > > reduce the need for per-connector code changes, we might try to > introduce a > > framework-level configuration property to dictate which of the > > pre-transform and post-transform topic partitions are used for the > fallback > > call to the single-arg variant if a task class has not overridden the > > multi-arg variant. But I think this is going a bit too far and would > prefer > > to keep things simple(r) for now. > > > > Cheers, > > > > Chris > > > > > > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <yash.ma...@gmail.com> wrote: > > > > > Hi Chris, > > > > > > > I was actually envisioning something like `void > > > > open(Collection<TopicPartition> originalPartitions, > > > > Collection<TopicPartition> transformedPartitions)` > > > > > > Ah okay, this does make a lot more sense. Sorry, I think I > misunderstood > > > you earlier. I do agree with you that this seems better than splitting > it > > > off into two new sets of open / close methods from a complexity > > standpoint. > > > > > > > Plus, if a connector is intentionally designed to use > > > > pre-transformation topic partitions in its open/close > > > > methods, wouldn't we just be trading one form of the > > > > problem for another by making this switch? > > > > > > On thinking about this a bit more, I'm not so convinced that we need to > > > expose the pre-transform / original topic partitions in the new open / > > > close methods. The purpose of the open / close methods is to allow sink > > > tasks to allocate and deallocate resources for each topic partition > > > assigned to the task and the purpose of topic-mutating SMTs is to > > > essentially modify the source topic name from the point of view of the > > sink > > > connector. Why would a sink connector ever need to or want to allocate > > > resources for pre-transform topic partitions? Is the argument here that > > > since we'll be exposing both the pre-transform and post-transform topic > > > partitions per record, we should also expose the same info via open / > > close > > > and allow sink connector implementations to disregard topic-mutating > SMTs > > > completely if they wanted to? > > > > > > Either way, I've gone ahead and updated the KIP to reflect all of > > > our previous discussion here since it had become quite outdated. I've > > also > > > updated the KIP title from "Sink Connectors: Support topic-mutating > SMTs > > > for async connectors (preCommit users)" to "Allow sink connectors to be > > > used with topic-mutating SMTs" since the improvements to the open / > close > > > mechanism doesn't pertain only to asynchronous sink connectors. The new > > KIP > > > URL is: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs > > > > > > > > > Thanks, > > > Yash > > > > > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton <chr...@aiven.io.invalid > > > > > wrote: > > > > > > > Hi Yash, > > > > > > > > I was actually envisioning something like `void > > > > open(Collection<TopicPartition> > > > > originalPartitions, Collection<TopicPartition> > transformedPartitions)`, > > > > since we already convert and transform each batch of records that we > > poll > > > > from the sink task's consumer en masse, meaning we could discover > > several > > > > new transformed partitions in between consecutive calls to > > SinkTask::put. > > > > > > > > It's also worth noting that we'll probably want to deprecate the > > existing > > > > open/close methods, at which point keeping one non-deprecated variant > > of > > > > each seems more appealing and less complex than keeping two. > > > > > > > > Honestly though, I think we're both on the same page enough that I > > > wouldn't > > > > object to either approach. We've probably reached the saturation > point > > > for > > > > ROI here and as long as we provide developers a way to get the > > > information > > > > they need from the runtime and take care to add Javadocs and update > our > > > > docs page (possibly including the connector development quickstart), > it > > > > should be fine. > > > > > > > > At this point, it might be worth updating the KIP based on recent > > > > discussion so that others can see the latest proposal, and we can > both > > > take > > > > a look and make sure everything looks good enough before opening a > vote > > > > thread. > > > > > > > > Finally, I think you make a convincing case for a time-based eviction > > > > policy. I wasn't thinking about the fairly common SMT pattern of > > > deriving a > > > > topic name from, e.g., a record field or header. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <yash.ma...@gmail.com> > > > wrote: > > > > > > > > > Hi Chris, > > > > > > > > > > > Plus, if a connector is intentionally designed to > > > > > > use pre-transformation topic partitions in its > > > > > > open/close methods, wouldn't we just be trading > > > > > > one form of the problem for another by making this > > > > > > switch? > > > > > > > > > > Thanks, this makes sense, and given that the KIP already proposes a > > way > > > > for > > > > > sink connector implementations to distinguish between pre-transform > > and > > > > > post-transform topics per record, I think I'm convinced that going > > with > > > > new > > > > > `open()` / `close()` methods is the right approach. However, I > still > > > feel > > > > > like having overloaded methods will make it a lot less unintuitive > > > given > > > > > that the two sets of methods would be different in terms of when > > > they're > > > > > called and what arguments they are passed (also I'm presuming that > > the > > > > > overloaded methods you're prescribing will only have a single > > > > > `TopicPartition` rather than a `Collection<TopicPartition>` as > their > > > > > parameters). I guess my concern is largely around the fact that it > > > won't > > > > be > > > > > possible to distinguish between the overloaded methods' use cases > > just > > > > from > > > > > the method signatures. I agree that naming is going to be difficult > > > here, > > > > > but I think that having two sets of `SinkTask::openXyz` / > > > > > `SinkTask::closeXyz` methods will be less complicated to understand > > > from > > > > a > > > > > connector developer perspective (as compared to overloaded methods > > with > > > > > only differing documentation). Of your suggested options, I think > > > > > `openPreTransform` / `openPostTransform` are the most > comprehensible > > > > ones. > > > > > > > > > > > BTW, I wouldn't say that we can't make assumptions > > > > > > about the relationships between pre- and post-transformation > > > > > > topic partitions. > > > > > > > > > > I meant that the framework wouldn't be able to deterministically > know > > > > when > > > > > to close a post-transform topic partition given that SMTs could use > > > > > per-record data / metadata to manipulate the topic names as and how > > > > > required (which supports the suggestion to use an eviction policy > > based > > > > > mechanism to call SinkTask::close for post-transform topic > > partitions). > > > > > > > > > > > We might utilize a policy that assumes a deterministic > > > > > > mapping from the former to the latter, for example. > > > > > > > > > > Wouldn't this be making the assumption that SMTs only use the topic > > > name > > > > > itself and no other data / metadata while computing the new topic > > name? > > > > Are > > > > > you suggesting that since this assumption could work for a majority > > of > > > > > SMTs, it might be more efficient overall in terms of reducing the > > > number > > > > of > > > > > "false-positive" calls to `SinkTask::closePostTransform` (and we'll > > > also > > > > be > > > > > able to call `SinkTask::closePostTransform` immediately after topic > > > > > partitions are revoked from the consumer)? I was thinking something > > > more > > > > > generic along the lines of a simple time based eviction policy that > > > > > wouldn't be making any assumptions regarding the SMT > implementations. > > > > > Either way, I do like your earlier suggestion of keeping this logic > > > > > internal and not painting ourselves into a corner by promising any > > > > > particular behavior in the KIP. > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton > > <chr...@aiven.io.invalid > > > > > > > > > wrote: > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > I think the key difference between adding methods/overloads > related > > > to > > > > > > SinkTask::open/SinkTask::close and SinkTask::put is that this > isn't > > > > > > auxiliary information that may or may not be useful to connector > > > > > > developers. It's actually critical for them to understand the > > > > difference > > > > > > between the two concepts here, even if they look very similar. > And > > > > yes, I > > > > > > do believe that switching from pre-transform to post-transform > > topic > > > > > > partitions is too big a change in behavior here. Plus, if a > > connector > > > > is > > > > > > intentionally designed to use pre-transformation topic partitions > > in > > > > its > > > > > > open/close methods, wouldn't we just be trading one form of the > > > problem > > > > > for > > > > > > another by making this switch? > > > > > > > > > > > > One possible alternative to overloading the existing methods is > to > > > > split > > > > > > SinkTask::open into openOriginal (or possibly openPhysical or > > > > > > openPreTransform) and openTransformed (or openLogical or > > > > > > openPostTransform), with a similar change for SinkTask::close. > The > > > > > default > > > > > > implementation for SinkTask::openOriginal can be to call > > > > SinkTask::open, > > > > > > and the same can go for SinkTask::close. However, I prefer > > > overloading > > > > > the > > > > > > existing methods since this alternative increases complexity and > > none > > > > of > > > > > > the names are very informative. > > > > > > > > > > > > BTW, I wouldn't say that we can't make assumptions about the > > > > > relationships > > > > > > between pre- and post-transformation topic partitions. We might > > > > utilize a > > > > > > policy that assumes a deterministic mapping from the former to > the > > > > > latter, > > > > > > for example. The distinction I'd draw is that the assumptions we > > make > > > > can > > > > > > and probably should favor some cases in terms of performance > (i.e., > > > > > > reducing the number of unnecessary calls to close/open over a > given > > > > sink > > > > > > task's lifetime), but should not lead to guaranteed resource > leaks > > or > > > > > > failure to obey API contract in any cases. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Chris > > > > > > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya < > yash.ma...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > especially if connectors are intentionally designed around > > > > > > > > original topic partitions instead of transformed ones. > > > > > > > > > > > > > > Ha, that's a good point and reminds me of Hyrum's Law [1] :) > > > > > > > > > > > > > > > I think we have to provide connector developers with some > > > > > > > > way to differentiate between the two, but maybe there's a way > > > > > > > > to do this that I haven't thought of yet > > > > > > > > > > > > > > I can't think of a better way to do this either; would invoking > > the > > > > > > > existing `SinkTask::open` and `SinkTask::close` methods with > > > > > > post-transform > > > > > > > topic partitions instead of pre-transform topic partitions not > be > > > > > > > acceptable even in a minor / major AK release? I feel like the > > > > proposed > > > > > > > approach of adding overloaded `SinkTask::open` / > > `SinkTask::close` > > > > > > methods > > > > > > > to differentiate between pre-transform and post-transform topic > > > > > > partitions > > > > > > > has similar pitfalls to the idea of the overloaded > > `SinkTask::put` > > > > > method > > > > > > > we discarded earlier. > > > > > > > > > > > > > > > Either way, I'm glad that the general idea of a cache and > > > > > > > > eviction policy for SinkTask::close seem reasonable; if > > > > > > > > we decide to go this route, it might make sense for the KIP > > > > > > > > to include an outline of one or more high-level strategies > > > > > > > > we might take, but without promising any particular behavior > > > > > > > > beyond occasionally calling SinkTask::close for > post-transform > > > > > > > > topic partitions. I'm hoping that this logic can stay > internal, > > > > > > > > and by notpainting ourselves into a corner with the KIP, we > > > > > > > > give ourselves leeway to tweak it in the future if necessary > > > > > > > > without filing another KIP or introducing a pluggable > > interface. > > > > > > > > > > > > > > Thanks, that's a good idea. Given the flexibility of SMTs, the > > > > > framework > > > > > > > can't really make any assumptions around topic partitions post > > > > > > > transformation nor does it have any way to definitively get any > > > such > > > > > > > information from transformations which is why the idea of a > cache > > > > with > > > > > an > > > > > > > eviction policy makes perfect sense! > > > > > > > > > > > > > > [1] - https://www.hyrumslaw.com/ > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > Yash > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton > > > <chr...@aiven.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > So it looks like with the current state of affairs, sink > > tasks > > > > that > > > > > > > only > > > > > > > > instantiate writers in the SinkTask::open method (and don't > do > > > the > > > > > lazy > > > > > > > > instantiation in SinkTask::put that you mentioned) might fail > > > when > > > > > used > > > > > > > > with topic/partition mutating SMTs even if they don't do any > > > > > > asynchronous > > > > > > > > processing? > > > > > > > > > > > > > > > > Yep, exactly 👍 > > > > > > > > > > > > > > > > > What do you think about retaining just the existing methods > > > > > > > > but changing when they're called in the Connect runtime? For > > > > > instance, > > > > > > > > instead of calling SinkTask::open after partition assignment > > > post a > > > > > > > > consumer group rebalance, we could cache the currently "seen" > > > topic > > > > > > > > partitions (post transformation) and before each call to > > > > > SinkTask::put > > > > > > > > check whether there's any new "unseen" topic partitions, and > if > > > so > > > > > call > > > > > > > > SinkTask::open (and also update the cache of course). > > > > > > > > > > > > > > > > IMO the issue here is that it's a drastic change in behavior > to > > > > start > > > > > > > > invoking SinkTask::open and SinkTask::close with > post-transform > > > > topic > > > > > > > > partitions instead of pre-transform, especially if connectors > > are > > > > > > > > intentionally designed around original topic partitions > instead > > > of > > > > > > > > transformed ones. I think we have to provide connector > > developers > > > > > with > > > > > > > some > > > > > > > > way to differentiate between the two, but maybe there's a way > > to > > > do > > > > > > this > > > > > > > > that I haven't thought of yet. Interested to hear your > > thoughts. > > > > > > > > > > > > > > > > Either way, I'm glad that the general idea of a cache and > > > eviction > > > > > > policy > > > > > > > > for SinkTask::close seem reasonable; if we decide to go this > > > route, > > > > > it > > > > > > > > might make sense for the KIP to include an outline of one or > > more > > > > > > > > high-level strategies we might take, but without promising > any > > > > > > particular > > > > > > > > behavior beyond occasionally calling SinkTask::close for > > > > > post-transform > > > > > > > > topic partitions. I'm hoping that this logic can stay > internal, > > > and > > > > > by > > > > > > > not > > > > > > > > painting ourselves into a corner with the KIP, we give > > ourselves > > > > > leeway > > > > > > > to > > > > > > > > tweak it in the future if necessary without filing another > KIP > > or > > > > > > > > introducing a pluggable interface. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya < > > yash.ma...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > > > > 1) That's a fair point; while I did scan everything > publicly > > > > > > available > > > > > > > on > > > > > > > > > GitHub, you're right in that it won't cover all possible > SMTs > > > > that > > > > > > are > > > > > > > > out > > > > > > > > > there. Thanks for the example use-case as well, I've > updated > > > the > > > > > KIP > > > > > > to > > > > > > > > add > > > > > > > > > the two new proposed methods. > > > > > > > > > > > > > > > > > > 2) So it looks like with the current state of affairs, sink > > > tasks > > > > > > that > > > > > > > > only > > > > > > > > > instantiate writers in the SinkTask::open method (and don't > > do > > > > the > > > > > > lazy > > > > > > > > > instantiation in SinkTask::put that you mentioned) might > fail > > > > when > > > > > > used > > > > > > > > > with topic/partition mutating SMTs even if they don't do > any > > > > > > > asynchronous > > > > > > > > > processing? Since they could encounter records in > > SinkTask::put > > > > > with > > > > > > > > > topics/partitions that they might not have created writers > > for. > > > > > > Thanks > > > > > > > > for > > > > > > > > > pointing this out, it's definitely another incompatibility > > that > > > > > needs > > > > > > > to > > > > > > > > be > > > > > > > > > called out and fixed. The overloaded method approach is > > > > > interesting, > > > > > > > but > > > > > > > > > comes with the caveat of yet more new methods that will > need > > to > > > > be > > > > > > > > > implemented by existing connectors if they want to make use > > of > > > > this > > > > > > new > > > > > > > > > functionality. What do you think about retaining just the > > > > existing > > > > > > > > methods > > > > > > > > > but changing when they're called in the Connect runtime? > For > > > > > > instance, > > > > > > > > > instead of calling SinkTask::open after partition > assignment > > > > post a > > > > > > > > > consumer group rebalance, we could cache the currently > "seen" > > > > topic > > > > > > > > > partitions (post transformation) and before each call to > > > > > > SinkTask::put > > > > > > > > > check whether there's any new "unseen" topic partitions, > and > > if > > > > so > > > > > > call > > > > > > > > > SinkTask::open (and also update the cache of course). I > don't > > > > think > > > > > > > this > > > > > > > > > would break the existing contract with sink tasks where > > > > > > SinkTask::open > > > > > > > is > > > > > > > > > expected to be called for a topic partition before any > > records > > > > from > > > > > > the > > > > > > > > > topic partition are sent via SinkTask::put? The > > SinkTask::close > > > > > case > > > > > > > is a > > > > > > > > > lot trickier however, and would require some sort of cache > > > > eviction > > > > > > > > policy > > > > > > > > > that would be deemed appropriate as you pointed out too. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > I've had some time to think on this KIP and I think I'm > in > > > > > > agreement > > > > > > > > > about > > > > > > > > > > not blocking it on an official compatibility library or > > > adding > > > > > the > > > > > > > > "ack" > > > > > > > > > > API for sink records. > > > > > > > > > > > > > > > > > > > > I only have two more thoughts: > > > > > > > > > > > > > > > > > > > > 1. Because it is possible to manipulate sink record > > > partitions > > > > > and > > > > > > > > > offsets > > > > > > > > > > with the current API we provide for transformations, I > > still > > > > > > believe > > > > > > > > > > methods should be added to the SinkRecord class to expose > > the > > > > > > > original > > > > > > > > > > partition and offset, not just the original topic. The > > > > additional > > > > > > > > > cognitive > > > > > > > > > > burden from these two methods is going to be minimal > > anyways; > > > > > once > > > > > > > > users > > > > > > > > > > understand the difference between the transformed topic > > name > > > > and > > > > > > the > > > > > > > > > > original one, it's going to be trivial for them to > > understand > > > > how > > > > > > > that > > > > > > > > > same > > > > > > > > > > difference applies for partitions and offsets. It's not > > > enough > > > > to > > > > > > > scan > > > > > > > > > the > > > > > > > > > > set of SMTs provided out of the box with Connect, ones > > > > developed > > > > > by > > > > > > > > > > Confluent, or even everything available on GitHub, since > > > there > > > > > may > > > > > > be > > > > > > > > > > closed-source projects out there that rely on this > ability. > > > One > > > > > > > > potential > > > > > > > > > > use case could be re-routing partitions between Kafka and > > > some > > > > > > other > > > > > > > > > > sharded system. > > > > > > > > > > > > > > > > > > > > 2. We still have to address the SinkTask::open [1] and > > > > > > > SinkTask::close > > > > > > > > > [2] > > > > > > > > > > methods. If a connector writes to the external system > using > > > the > > > > > > > > > transformed > > > > > > > > > > topic partitions it reads from Kafka, then it's possible > > for > > > > the > > > > > > > > > connector > > > > > > > > > > to lazily instantiate writers for topic partitions as it > > > > > encounters > > > > > > > > them > > > > > > > > > > from records provided in SinkTask::put. However, > connectors > > > > also > > > > > > > need a > > > > > > > > > way > > > > > > > > > > to de-allocate those writers (and the resources used by > > them) > > > > > over > > > > > > > > time, > > > > > > > > > > which they can't do as easily. One possible approach here > > is > > > to > > > > > > > > overload > > > > > > > > > > SinkTask::open and SinkTask::close with variants that > > > > distinguish > > > > > > > > between > > > > > > > > > > transformed and original topic partitions, and default to > > > > > invoking > > > > > > > the > > > > > > > > > > existing methods with just the original topic partitions. > > We > > > > > would > > > > > > > then > > > > > > > > > > have several options for how the Connect runtime can > invoke > > > > these > > > > > > > > > methods, > > > > > > > > > > but in general, an approach that guarantees that tasks > are > > > > > notified > > > > > > > of > > > > > > > > > > transformed topic partitions in SinkTask::open before any > > > > records > > > > > > for > > > > > > > > > that > > > > > > > > > > partition are given to it in SinkTask::put, and makes a > > > > > best-effort > > > > > > > > > attempt > > > > > > > > > > to close transformed topic partitions that appear to no > > > longer > > > > be > > > > > > in > > > > > > > > use > > > > > > > > > > based on some eviction policy, would probably be > > sufficient. > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection) > > > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection) > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya < > > > > yash.ma...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your inputs! > > > > > > > > > > > > > > > > > > > > > > > would provide a simple, clean interface for > developers > > to > > > > > > > determine > > > > > > > > > > > > which features are supported by the version of the > > > Connect > > > > > > > runtime > > > > > > > > > > > > that their plugin has been deployed onto > > > > > > > > > > > > > > > > > > > > > > I do like the idea of having such a public > compatibility > > > > > library > > > > > > - > > > > > > > I > > > > > > > > > > think > > > > > > > > > > > it would remove a lot of restrictions from framework > > > > > development > > > > > > if > > > > > > > > it > > > > > > > > > > were > > > > > > > > > > > to be widely adopted. > > > > > > > > > > > > > > > > > > > > > > > we might consider adding an API to "ack" sink records > > > > > > > > > > > > > > > > > > > > > > I agree that this does seem like a more intuitive and > > clean > > > > > API, > > > > > > > but > > > > > > > > > I'm > > > > > > > > > > > concerned about the backward compatibility headache > we'd > > be > > > > > > > imposing > > > > > > > > on > > > > > > > > > > all > > > > > > > > > > > existing sink connectors. Connector developers will > have > > to > > > > > > > maintain > > > > > > > > > two > > > > > > > > > > > separate ways of doing offset management if they want > to > > > use > > > > > the > > > > > > > new > > > > > > > > > API > > > > > > > > > > > but continue supporting older versions of Kafka > Connect. > > > > > > > > > > > > > > > > > > > > > > For now, I've reverted the KIP to the previous > iteration > > > > which > > > > > > > > proposed > > > > > > > > > > the > > > > > > > > > > > addition of a new `SinkRecord` method to obtain the > > > original > > > > > > Kafka > > > > > > > > > topic > > > > > > > > > > > pre-transformation. One thing to note is that I've > > removed > > > > the > > > > > > > method > > > > > > > > > for > > > > > > > > > > > obtaining the original Kafka partition after a cursory > > > search > > > > > > > showed > > > > > > > > > that > > > > > > > > > > > use cases for partition modifying SMTs are primarily on > > the > > > > > > source > > > > > > > > > > > connector side. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > I have more comments I'd like to make on this KIP > when > > I > > > > have > > > > > > > time > > > > > > > > > > (sorry > > > > > > > > > > > > for the delay, Yash, and thanks for your patience!), > > but > > > I > > > > > did > > > > > > > want > > > > > > > > > to > > > > > > > > > > > > chime in and say that I'm also not sure about > > overloading > > > > > > > > > > SinkTask::put. > > > > > > > > > > > I > > > > > > > > > > > > share the concerns about creating an intuitive, > simple > > > API > > > > > that > > > > > > > > Yash > > > > > > > > > > has > > > > > > > > > > > > raised. In addition, this approach doesn't seem very > > > > > > > > > sustainable--what > > > > > > > > > > do > > > > > > > > > > > > we do if we encounter another case in the future that > > > would > > > > > > > > warrant a > > > > > > > > > > > > similar solution? We probably don't want to create > > three, > > > > > four, > > > > > > > > etc. > > > > > > > > > > > > overloaded variants of the method, each of which > would > > > have > > > > > to > > > > > > be > > > > > > > > > > > > implemented by connector developers who want to both > > > > leverage > > > > > > the > > > > > > > > > > latest > > > > > > > > > > > > and greatest connector APIs and maintain > compatibility > > > with > > > > > > > connect > > > > > > > > > > > > Clusters running older versions. > > > > > > > > > > > > > > > > > > > > > > > > I haven't been able to flesh this out into a design > > worth > > > > > > > > publishing > > > > > > > > > in > > > > > > > > > > > its > > > > > > > > > > > > own KIP yet, but one alternative I've pitched to a > few > > > > people > > > > > > > with > > > > > > > > > > > > generally positive interest has been to develop an > > > official > > > > > > > > > > compatibility > > > > > > > > > > > > library for Connect developers. This library would be > > > > > released > > > > > > as > > > > > > > > its > > > > > > > > > > own > > > > > > > > > > > > Maven artifact (separate from connect-api, > > > connect-runtime, > > > > > > etc.) > > > > > > > > and > > > > > > > > > > > would > > > > > > > > > > > > provide a simple, clean interface for developers to > > > > determine > > > > > > > which > > > > > > > > > > > > features are supported by the version of the Connect > > > > runtime > > > > > > that > > > > > > > > > their > > > > > > > > > > > > plugin has been deployed onto. Under the hood, this > > > library > > > > > > might > > > > > > > > use > > > > > > > > > > > > reflection to determine whether classes, methods, > etc. > > > are > > > > > > > > available, > > > > > > > > > > but > > > > > > > > > > > > the developer wouldn't have to do anything more than > > > check > > > > > (for > > > > > > > > > > example) > > > > > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` > > to > > > > know > > > > > > at > > > > > > > > any > > > > > > > > > > > point > > > > > > > > > > > > in the lifetime of their connector/task whether that > > > > feature > > > > > is > > > > > > > > > > provided > > > > > > > > > > > by > > > > > > > > > > > > the runtime. > > > > > > > > > > > > > > > > > > > > > > > > One other high-level comment: this doesn't address > > every > > > > > case, > > > > > > > but > > > > > > > > we > > > > > > > > > > > might > > > > > > > > > > > > consider adding an API to "ack" sink records. This > > could > > > > use > > > > > > the > > > > > > > > > > > > SubmittedRecords class [1] (with some slight tweaks) > > > under > > > > > the > > > > > > > hood > > > > > > > > > to > > > > > > > > > > > > track the latest-acked offset for each topic > partition. > > > > This > > > > > > way, > > > > > > > > > > > connector > > > > > > > > > > > > developers won't be responsible for tracking offsets > at > > > all > > > > > in > > > > > > > > their > > > > > > > > > > sink > > > > > > > > > > > > tasks (eliminating issues with the accuracy of > > > > > > > post-transformation > > > > > > > > > > T/P/O > > > > > > > > > > > > sink record information), and they'll only have to > > notify > > > > the > > > > > > > > Connect > > > > > > > > > > > > framework when a record has been successfully > > dispatched > > > to > > > > > the > > > > > > > > > > external > > > > > > > > > > > > system. This provides a cleaner, friendlier API, and > > also > > > > > > enables > > > > > > > > > more > > > > > > > > > > > > fine-grained metrics like the ones proposed in > KIP-767 > > > [2]. > > > > > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > > > > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya < > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > > > > > It's been a while for this one but the more I think > > > about > > > > > it, > > > > > > > the > > > > > > > > > > more > > > > > > > > > > > I > > > > > > > > > > > > > feel like the current approach with a new > overloaded > > > > > > > > > `SinkTask::put` > > > > > > > > > > > > method > > > > > > > > > > > > > might not be optimal. We're trying to fix a pretty > > > corner > > > > > > case > > > > > > > > bug > > > > > > > > > > here > > > > > > > > > > > > > (usage of topic mutating SMTs with sink connectors > > that > > > > do > > > > > > > their > > > > > > > > > own > > > > > > > > > > > > offset > > > > > > > > > > > > > tracking) and I'm not sure that warrants a change > to > > > > such a > > > > > > > > central > > > > > > > > > > > > > interface method. The new `SinkTask::put` method > just > > > > seems > > > > > > > > > somewhat > > > > > > > > > > > odd > > > > > > > > > > > > > and it may not be very understandable for a new > > reader > > > - > > > > I > > > > > > > don't > > > > > > > > > > think > > > > > > > > > > > > this > > > > > > > > > > > > > should be the case for a public interface method. > > > > > > Furthermore, > > > > > > > > even > > > > > > > > > > > with > > > > > > > > > > > > > elaborate documentation in place, I'm not sure if > > it'll > > > > be > > > > > > very > > > > > > > > > > obvious > > > > > > > > > > > > to > > > > > > > > > > > > > most people what the purpose of having these two > > `put` > > > > > > methods > > > > > > > is > > > > > > > > > and > > > > > > > > > > > how > > > > > > > > > > > > > they should be used by sink task implementations. > > What > > > do > > > > > you > > > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya < > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable feedback so far! > > I've > > > > > > updated > > > > > > > > the > > > > > > > > > > KIP > > > > > > > > > > > > > based > > > > > > > > > > > > > > on our discussion above. Could you please take > > > another > > > > > > look? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch < > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya < > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Hi Randall, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Thanks for elaborating. I think these are all > > very > > > > > good > > > > > > > > points > > > > > > > > > > > and I > > > > > > > > > > > > > see > > > > > > > > > > > > > >> > why the overloaded `SinkTask::put` method is a > > > > cleaner > > > > > > > > > solution > > > > > > > > > > > > > overall. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > public void put(Collection<SinkRecord> > > records, > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > >> > TopicPartition> updatedTopicPartitions) > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I think this should be > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > `public void put(Collection<SinkRecord> > records, > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > >> > TopicPartition> originalTopicPartitions)` > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > instead because the sink records themselves > have > > > the > > > > > > > updated > > > > > > > > > > topic > > > > > > > > > > > > > >> > partitions (i.e. after all transformations > have > > > been > > > > > > > > applied) > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > >> KIP > > > > > > > > > > > > > >> > is proposing a way for the tasks to be able to > > > > access > > > > > > the > > > > > > > > > > original > > > > > > > > > > > > > topic > > > > > > > > > > > > > >> > partition (i.e. before transformations have > been > > > > > > applied). > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Sounds good. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > Of course, if the developer does not need > > > separate > > > > > > > > methods, > > > > > > > > > > they > > > > > > > > > > > > can > > > > > > > > > > > > > >> > easily have the older `put` method simply > > delegate > > > > to > > > > > > the > > > > > > > > > newer > > > > > > > > > > > > > method. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > If the developer does not need separate > methods > > > > (i.e. > > > > > > they > > > > > > > > > don't > > > > > > > > > > > > need > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > use this new addition), they can simply > continue > > > > > > > > implementing > > > > > > > > > > just > > > > > > > > > > > > the > > > > > > > > > > > > > >> > older `put` method right? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Correct. We should update the JavaDoc of both > > > methods > > > > to > > > > > > > make > > > > > > > > > this > > > > > > > > > > > > > clear, > > > > > > > > > > > > > >> and in general how the two methods should are > used > > > and > > > > > > > should > > > > > > > > be > > > > > > > > > > > > > >> implemented. That can be part of the PR, and the > > KIP > > > > > > doesn't > > > > > > > > > need > > > > > > > > > > > this > > > > > > > > > > > > > >> wording. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for > > > *eventually* > > > > > > > > > deprecating > > > > > > > > > > > the > > > > > > > > > > > > > >> older > > > > > > > > > > > > > >> > method, once the Connect runtime versions > > without > > > > this > > > > > > > > change > > > > > > > > > > are > > > > > > > > > > > > old > > > > > > > > > > > > > >> > enough. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate the > > older > > > > > > method. > > > > > > > > > Most > > > > > > > > > > > > common > > > > > > > > > > > > > >> sink > > > > > > > > > > > > > >> > connector implementations do not do their own > > > offset > > > > > > > > tracking > > > > > > > > > > with > > > > > > > > > > > > > >> > asynchronous processing and will probably > never > > > > have a > > > > > > > need > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > >> > additional parameter `Map<SinkRecord, > > > > TopicPartition> > > > > > > > > > > > > > >> > originalTopicPartitions` in the proposed new > > `put` > > > > > > method. > > > > > > > > > These > > > > > > > > > > > > > >> connectors > > > > > > > > > > > > > >> > can continue implementing only the existing > > > > > > > `SinkTask::put` > > > > > > > > > > method > > > > > > > > > > > > > which > > > > > > > > > > > > > >> > will be called by the default implementation > of > > > the > > > > > > newer > > > > > > > > > > > overloaded > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > >> > method. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> +1 > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > the pre-commit methods use the same > > > > > > `Map<TopicPartition, > > > > > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data > > structure > > > > I'm > > > > > > > > > suggesting > > > > > > > > > > > be > > > > > > > > > > > > > >> used. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > The data structure you're suggesting be used > is > > a > > > > > > > > > > `Map<SinkRecord, > > > > > > > > > > > > > >> > TopicPartition>` which will map `SinkRecord` > > > objects > > > > > to > > > > > > > the > > > > > > > > > > > original > > > > > > > > > > > > > >> topic > > > > > > > > > > > > > >> > partition of the corresponding > `ConsumerRecord` > > > > right? > > > > > > To > > > > > > > > > > clarify, > > > > > > > > > > > > > this > > > > > > > > > > > > > >> is > > > > > > > > > > > > > >> > a new data structure that will need to be > > managed > > > in > > > > > the > > > > > > > > > > > > > >> `WorkerSinkTask`. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Ah, you're right. Thanks for the correction. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Best regards, > > > > > > > > > > > > > >> Randall > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > > > >> > Yash > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch < > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > Hi, Yash. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > I'm not sure I quite understand why it would > > be > > > > > > "easier" > > > > > > > > for > > > > > > > > > > > > > connector > > > > > > > > > > > > > >> > > > developers to account for implementing two > > > > > different > > > > > > > > > > > overloaded > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > >> > > > methods (assuming that they want to use > this > > > new > > > > > > > > feature) > > > > > > > > > > > versus > > > > > > > > > > > > > >> using > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > > > > > methods? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Using a try-catch to try around an API > method > > > that > > > > > > > *might* > > > > > > > > > be > > > > > > > > > > > > there > > > > > > > > > > > > > >> is a > > > > > > > > > > > > > >> > > very unusual thing for most developers. > > > > > Unfortunately, > > > > > > > > we've > > > > > > > > > > had > > > > > > > > > > > > to > > > > > > > > > > > > > >> > resort > > > > > > > > > > > > > >> > > to this atypical approach with Connect in > > places > > > > > when > > > > > > > > there > > > > > > > > > > was > > > > > > > > > > > no > > > > > > > > > > > > > >> good > > > > > > > > > > > > > >> > > alternative. We seem to relying upon pattern > > > > because > > > > > > > it's > > > > > > > > > > easier > > > > > > > > > > > > for > > > > > > > > > > > > > >> us, > > > > > > > > > > > > > >> > > not because it offers a better experience > for > > > > > > Connector > > > > > > > > > > > > developers. > > > > > > > > > > > > > >> IMO, > > > > > > > > > > > > > >> > if > > > > > > > > > > > > > >> > > there's a practical alternative that uses > > normal > > > > > > > > development > > > > > > > > > > > > > practices > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > techniques, then we should use that > > alternative. > > > > > IIUC, > > > > > > > > there > > > > > > > > > > is > > > > > > > > > > > at > > > > > > > > > > > > > >> least > > > > > > > > > > > > > >> > > one practical alternative for this KIP that > > > would > > > > > not > > > > > > > > > require > > > > > > > > > > > > > >> developers > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > >> > > use the unusual try-catch to handle the case > > > where > > > > > > > methods > > > > > > > > > are > > > > > > > > > > > not > > > > > > > > > > > > > >> found. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > I also think having two `put` methods is > > easier > > > > when > > > > > > the > > > > > > > > > > > Connector > > > > > > > > > > > > > >> has to > > > > > > > > > > > > > >> > > do different things for different Connect > > > > runtimes, > > > > > > too. > > > > > > > > One > > > > > > > > > > of > > > > > > > > > > > > > those > > > > > > > > > > > > > >> > > methods is called by newer Connect runtimes > > with > > > > the > > > > > > new > > > > > > > > > > > behavior, > > > > > > > > > > > > > and > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > other method is called by an older Connect > > > > runtime. > > > > > Of > > > > > > > > > course, > > > > > > > > > > > if > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > developer does not need separate methods, > they > > > can > > > > > > > easily > > > > > > > > > have > > > > > > > > > > > the > > > > > > > > > > > > > >> older > > > > > > > > > > > > > >> > > `put` method simply delegate to the newer > > > method. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for > > > *eventually* > > > > > > > > > deprecating > > > > > > > > > > > the > > > > > > > > > > > > > >> older > > > > > > > > > > > > > >> > > method, once the Connect runtime versions > > > without > > > > > this > > > > > > > > > change > > > > > > > > > > > are > > > > > > > > > > > > > old > > > > > > > > > > > > > >> > > enough. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > I think the advantage of going with the > > > > > > > > > > > > > >> > > > proposed approach in the KIP is that it > > > wouldn't > > > > > > > require > > > > > > > > > > extra > > > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in > your > > > > > proposed > > > > > > > > > > approach) > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > The connector does have to do some of this > > > > > bookkeeping > > > > > > > in > > > > > > > > > how > > > > > > > > > > > they > > > > > > > > > > > > > >> track > > > > > > > > > > > > > >> > > the topic partition offsets used in the > > > > `preCommit`, > > > > > > and > > > > > > > > the > > > > > > > > > > > > > >> pre-commit > > > > > > > > > > > > > >> > > methods use the same `Map<TopicPartition, > > > > > > > > OffsetAndMetadata> > > > > > > > > > > > > > >> > > currentOffsets` > > > > > > > > > > > > > >> > > data structure I'm suggesting be used. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > I hope that helps. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Best regards, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Randall > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya < > > > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Hi Randall, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Thanks for reviewing the KIP! > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > That latter logic can get quite ugly. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I'm not sure I quite understand why it > would > > > be > > > > > > > "easier" > > > > > > > > > for > > > > > > > > > > > > > >> connector > > > > > > > > > > > > > >> > > > developers to account for implementing two > > > > > different > > > > > > > > > > > overloaded > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > >> > > > methods (assuming that they want to use > this > > > new > > > > > > > > feature) > > > > > > > > > > > versus > > > > > > > > > > > > > >> using > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > > > > > methods? > > > > > > In > > > > > > > > > both > > > > > > > > > > > > > cases, a > > > > > > > > > > > > > >> > > > connector developer would need to write > > > > additional > > > > > > > code > > > > > > > > in > > > > > > > > > > > order > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > > ensure > > > > > > > > > > > > > >> > > > that their connector continues working > with > > > > older > > > > > > > > Connect > > > > > > > > > > > > > runtimes. > > > > > > > > > > > > > >> > > > Furthermore, we would probably need to > > > carefully > > > > > > > > document > > > > > > > > > > how > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > > implementation for the older `put` method > > > should > > > > > > look > > > > > > > > like > > > > > > > > > > for > > > > > > > > > > > > > >> > connectors > > > > > > > > > > > > > >> > > > that want to use this new feature. I think > > the > > > > > > > advantage > > > > > > > > > of > > > > > > > > > > > > going > > > > > > > > > > > > > >> with > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > proposed approach in the KIP is that it > > > wouldn't > > > > > > > require > > > > > > > > > > extra > > > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in > your > > > > > proposed > > > > > > > > > > approach) > > > > > > > > > > > > and > > > > > > > > > > > > > >> also > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > fact that the try-catch based logic is an > > > > already > > > > > > > > > > established > > > > > > > > > > > > > >> pattern > > > > > > > > > > > > > >> > > > through > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > > > > > > > > > > > > >> > > > and other KIPs which added methods to > > > > source/sink > > > > > > > > > > > connector/task > > > > > > > > > > > > > >> > > contexts. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Let me know if you still feel that having > a > > > new > > > > > > > > overloaded > > > > > > > > > > put > > > > > > > > > > > > > >> method > > > > > > > > > > > > > >> > is > > > > > > > > > > > > > >> > > a > > > > > > > > > > > > > >> > > > cleaner solution and I'd be happy to > > > reconsider! > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > > > >> > > > Yash > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall > > > Hauch < > > > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP > > and > > > > > > > > discussion. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > The KIP includes this rejected > > alternative: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to > > pass > > > > the > > > > > > new > > > > > > > > > > > > information > > > > > > > > > > > > > >> > outside > > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived > > class) > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > - > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Much more disruptive change without > > > > > > > considerable > > > > > > > > > pros > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > One advantage about doing this is that > > sink > > > > > > > connector > > > > > > > > > > > > > >> implementations > > > > > > > > > > > > > >> > > can > > > > > > > > > > > > > >> > > > > more easily implement two different > > > "put(...)" > > > > > > > methods > > > > > > > > > to > > > > > > > > > > > > handle > > > > > > > > > > > > > >> > > running > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > >> > > > > a variety of runtimes, without having to > > use > > > > > > > try-catch > > > > > > > > > > logic > > > > > > > > > > > > > >> around > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > > newer SinkRecord access methods. That > > latter > > > > > logic > > > > > > > can > > > > > > > > > get > > > > > > > > > > > > quite > > > > > > > > > > > > > >> > ugly. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > For example, the existing `put` method > has > > > > this > > > > > > > > > signature: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > public abstract void > > > > put(Collection<SinkRecord> > > > > > > > > > records); > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > If we added an overloaded method that > > passed > > > > in > > > > > a > > > > > > > map > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > old > > > > > > > > > > > > > >> > > > > topic+partition for each record (and > > defined > > > > the > > > > > > > > absence > > > > > > > > > > of > > > > > > > > > > > an > > > > > > > > > > > > > >> entry > > > > > > > > > > > > > >> > as > > > > > > > > > > > > > >> > > > > having an unchanged topic and > partition): > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > > > records, > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) > { > > > > > > > > > > > > > >> > > > > put(records); > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > then a `SinkTask` implementation that > > wants > > > to > > > > > use > > > > > > > > this > > > > > > > > > > new > > > > > > > > > > > > > >> feature > > > > > > > > > > > > > >> > > could > > > > > > > > > > > > > >> > > > > simply implement both methods: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > > > records) > > > > > { > > > > > > > > > > > > > >> > > > > // Running in an older runtime, so no > > > tracking > > > > > of > > > > > > > > > > > SMT-modified > > > > > > > > > > > > > >> topic > > > > > > > > > > > > > >> > > > names > > > > > > > > > > > > > >> > > > > or partitions > > > > > > > > > > > > > >> > > > > put(records, Map.of()); > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> > > > > records, > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) > { > > > > > > > > > > > > > >> > > > > // real logic here > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > This seems a lot easier than having to > use > > > > > > try-catch > > > > > > > > > > logic, > > > > > > > > > > > > yet > > > > > > > > > > > > > >> still > > > > > > > > > > > > > >> > > > > allows sink connectors to utilize the > new > > > > > > > > functionality > > > > > > > > > > and > > > > > > > > > > > > > still > > > > > > > > > > > > > >> > work > > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > > >> > > > > older Connect runtimes. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > WDYT? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Randall > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash > Mayya > > < > > > > > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Hi all, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > I would like to (re)start a new > > discussion > > > > > > thread > > > > > > > on > > > > > > > > > > > KIP-793 > > > > > > > > > > > > > >> (Kafka > > > > > > > > > > > > > >> > > > > > Connect) which proposes some additions > > to > > > > the > > > > > > > public > > > > > > > > > > > > > SinkRecord > > > > > > > > > > > > > >> > > > interface > > > > > > > > > > > > > >> > > > > > in order to support topic mutating > SMTs > > > for > > > > > sink > > > > > > > > > > > connectors > > > > > > > > > > > > > >> that do > > > > > > > > > > > > > >> > > > their > > > > > > > > > > > > > >> > > > > > own offset tracking. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Links: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > KIP: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Older discussion thread: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h > > > > , > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Jira: > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431 > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > > > >> > > > > > Yash > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >