Hi Yash, I was actually envisioning something like `void open(Collection<TopicPartition> originalPartitions, Collection<TopicPartition> transformedPartitions)`, since we already convert and transform each batch of records that we poll from the sink task's consumer en masse, meaning we could discover several new transformed partitions in between consecutive calls to SinkTask::put.
It's also worth noting that we'll probably want to deprecate the existing open/close methods, at which point keeping one non-deprecated variant of each seems more appealing and less complex than keeping two. Honestly though, I think we're both on the same page enough that I wouldn't object to either approach. We've probably reached the saturation point for ROI here and as long as we provide developers a way to get the information they need from the runtime and take care to add Javadocs and update our docs page (possibly including the connector development quickstart), it should be fine. At this point, it might be worth updating the KIP based on recent discussion so that others can see the latest proposal, and we can both take a look and make sure everything looks good enough before opening a vote thread. Finally, I think you make a convincing case for a time-based eviction policy. I wasn't thinking about the fairly common SMT pattern of deriving a topic name from, e.g., a record field or header. Cheers, Chris On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Chris, > > > Plus, if a connector is intentionally designed to > > use pre-transformation topic partitions in its > > open/close methods, wouldn't we just be trading > > one form of the problem for another by making this > > switch? > > Thanks, this makes sense, and given that the KIP already proposes a way for > sink connector implementations to distinguish between pre-transform and > post-transform topics per record, I think I'm convinced that going with new > `open()` / `close()` methods is the right approach. However, I still feel > like having overloaded methods will make it a lot less unintuitive given > that the two sets of methods would be different in terms of when they're > called and what arguments they are passed (also I'm presuming that the > overloaded methods you're prescribing will only have a single > `TopicPartition` rather than a `Collection<TopicPartition>` as their > parameters). I guess my concern is largely around the fact that it won't be > possible to distinguish between the overloaded methods' use cases just from > the method signatures. I agree that naming is going to be difficult here, > but I think that having two sets of `SinkTask::openXyz` / > `SinkTask::closeXyz` methods will be less complicated to understand from a > connector developer perspective (as compared to overloaded methods with > only differing documentation). Of your suggested options, I think > `openPreTransform` / `openPostTransform` are the most comprehensible ones. > > > BTW, I wouldn't say that we can't make assumptions > > about the relationships between pre- and post-transformation > > topic partitions. > > I meant that the framework wouldn't be able to deterministically know when > to close a post-transform topic partition given that SMTs could use > per-record data / metadata to manipulate the topic names as and how > required (which supports the suggestion to use an eviction policy based > mechanism to call SinkTask::close for post-transform topic partitions). > > > We might utilize a policy that assumes a deterministic > > mapping from the former to the latter, for example. > > Wouldn't this be making the assumption that SMTs only use the topic name > itself and no other data / metadata while computing the new topic name? Are > you suggesting that since this assumption could work for a majority of > SMTs, it might be more efficient overall in terms of reducing the number of > "false-positive" calls to `SinkTask::closePostTransform` (and we'll also be > able to call `SinkTask::closePostTransform` immediately after topic > partitions are revoked from the consumer)? I was thinking something more > generic along the lines of a simple time based eviction policy that > wouldn't be making any assumptions regarding the SMT implementations. > Either way, I do like your earlier suggestion of keeping this logic > internal and not painting ourselves into a corner by promising any > particular behavior in the KIP. > > Thanks, > Yash > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Yash, > > > > I think the key difference between adding methods/overloads related to > > SinkTask::open/SinkTask::close and SinkTask::put is that this isn't > > auxiliary information that may or may not be useful to connector > > developers. It's actually critical for them to understand the difference > > between the two concepts here, even if they look very similar. And yes, I > > do believe that switching from pre-transform to post-transform topic > > partitions is too big a change in behavior here. Plus, if a connector is > > intentionally designed to use pre-transformation topic partitions in its > > open/close methods, wouldn't we just be trading one form of the problem > for > > another by making this switch? > > > > One possible alternative to overloading the existing methods is to split > > SinkTask::open into openOriginal (or possibly openPhysical or > > openPreTransform) and openTransformed (or openLogical or > > openPostTransform), with a similar change for SinkTask::close. The > default > > implementation for SinkTask::openOriginal can be to call SinkTask::open, > > and the same can go for SinkTask::close. However, I prefer overloading > the > > existing methods since this alternative increases complexity and none of > > the names are very informative. > > > > BTW, I wouldn't say that we can't make assumptions about the > relationships > > between pre- and post-transformation topic partitions. We might utilize a > > policy that assumes a deterministic mapping from the former to the > latter, > > for example. The distinction I'd draw is that the assumptions we make can > > and probably should favor some cases in terms of performance (i.e., > > reducing the number of unnecessary calls to close/open over a given sink > > task's lifetime), but should not lead to guaranteed resource leaks or > > failure to obey API contract in any cases. > > > > Cheers, > > > > Chris > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <yash.ma...@gmail.com> > wrote: > > > > > Hi Chris, > > > > > > > especially if connectors are intentionally designed around > > > > original topic partitions instead of transformed ones. > > > > > > Ha, that's a good point and reminds me of Hyrum's Law [1] :) > > > > > > > I think we have to provide connector developers with some > > > > way to differentiate between the two, but maybe there's a way > > > > to do this that I haven't thought of yet > > > > > > I can't think of a better way to do this either; would invoking the > > > existing `SinkTask::open` and `SinkTask::close` methods with > > post-transform > > > topic partitions instead of pre-transform topic partitions not be > > > acceptable even in a minor / major AK release? I feel like the proposed > > > approach of adding overloaded `SinkTask::open` / `SinkTask::close` > > methods > > > to differentiate between pre-transform and post-transform topic > > partitions > > > has similar pitfalls to the idea of the overloaded `SinkTask::put` > method > > > we discarded earlier. > > > > > > > Either way, I'm glad that the general idea of a cache and > > > > eviction policy for SinkTask::close seem reasonable; if > > > > we decide to go this route, it might make sense for the KIP > > > > to include an outline of one or more high-level strategies > > > > we might take, but without promising any particular behavior > > > > beyond occasionally calling SinkTask::close for post-transform > > > > topic partitions. I'm hoping that this logic can stay internal, > > > > and by notpainting ourselves into a corner with the KIP, we > > > > give ourselves leeway to tweak it in the future if necessary > > > > without filing another KIP or introducing a pluggable interface. > > > > > > Thanks, that's a good idea. Given the flexibility of SMTs, the > framework > > > can't really make any assumptions around topic partitions post > > > transformation nor does it have any way to definitively get any such > > > information from transformations which is why the idea of a cache with > an > > > eviction policy makes perfect sense! > > > > > > [1] - https://www.hyrumslaw.com/ > > > > > > > > > Thanks, > > > Yash > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton <chr...@aiven.io.invalid> > > > wrote: > > > > > > > Hi Yash, > > > > > > > > > So it looks like with the current state of affairs, sink tasks that > > > only > > > > instantiate writers in the SinkTask::open method (and don't do the > lazy > > > > instantiation in SinkTask::put that you mentioned) might fail when > used > > > > with topic/partition mutating SMTs even if they don't do any > > asynchronous > > > > processing? > > > > > > > > Yep, exactly 👍 > > > > > > > > > What do you think about retaining just the existing methods > > > > but changing when they're called in the Connect runtime? For > instance, > > > > instead of calling SinkTask::open after partition assignment post a > > > > consumer group rebalance, we could cache the currently "seen" topic > > > > partitions (post transformation) and before each call to > SinkTask::put > > > > check whether there's any new "unseen" topic partitions, and if so > call > > > > SinkTask::open (and also update the cache of course). > > > > > > > > IMO the issue here is that it's a drastic change in behavior to start > > > > invoking SinkTask::open and SinkTask::close with post-transform topic > > > > partitions instead of pre-transform, especially if connectors are > > > > intentionally designed around original topic partitions instead of > > > > transformed ones. I think we have to provide connector developers > with > > > some > > > > way to differentiate between the two, but maybe there's a way to do > > this > > > > that I haven't thought of yet. Interested to hear your thoughts. > > > > > > > > Either way, I'm glad that the general idea of a cache and eviction > > policy > > > > for SinkTask::close seem reasonable; if we decide to go this route, > it > > > > might make sense for the KIP to include an outline of one or more > > > > high-level strategies we might take, but without promising any > > particular > > > > behavior beyond occasionally calling SinkTask::close for > post-transform > > > > topic partitions. I'm hoping that this logic can stay internal, and > by > > > not > > > > painting ourselves into a corner with the KIP, we give ourselves > leeway > > > to > > > > tweak it in the future if necessary without filing another KIP or > > > > introducing a pluggable interface. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <yash.ma...@gmail.com> > > wrote: > > > > > > > > > Hi Chris, > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > 1) That's a fair point; while I did scan everything publicly > > available > > > on > > > > > GitHub, you're right in that it won't cover all possible SMTs that > > are > > > > out > > > > > there. Thanks for the example use-case as well, I've updated the > KIP > > to > > > > add > > > > > the two new proposed methods. > > > > > > > > > > 2) So it looks like with the current state of affairs, sink tasks > > that > > > > only > > > > > instantiate writers in the SinkTask::open method (and don't do the > > lazy > > > > > instantiation in SinkTask::put that you mentioned) might fail when > > used > > > > > with topic/partition mutating SMTs even if they don't do any > > > asynchronous > > > > > processing? Since they could encounter records in SinkTask::put > with > > > > > topics/partitions that they might not have created writers for. > > Thanks > > > > for > > > > > pointing this out, it's definitely another incompatibility that > needs > > > to > > > > be > > > > > called out and fixed. The overloaded method approach is > interesting, > > > but > > > > > comes with the caveat of yet more new methods that will need to be > > > > > implemented by existing connectors if they want to make use of this > > new > > > > > functionality. What do you think about retaining just the existing > > > > methods > > > > > but changing when they're called in the Connect runtime? For > > instance, > > > > > instead of calling SinkTask::open after partition assignment post a > > > > > consumer group rebalance, we could cache the currently "seen" topic > > > > > partitions (post transformation) and before each call to > > SinkTask::put > > > > > check whether there's any new "unseen" topic partitions, and if so > > call > > > > > SinkTask::open (and also update the cache of course). I don't think > > > this > > > > > would break the existing contract with sink tasks where > > SinkTask::open > > > is > > > > > expected to be called for a topic partition before any records from > > the > > > > > topic partition are sent via SinkTask::put? The SinkTask::close > case > > > is a > > > > > lot trickier however, and would require some sort of cache eviction > > > > policy > > > > > that would be deemed appropriate as you pointed out too. > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton > > <chr...@aiven.io.invalid > > > > > > > > > wrote: > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > I've had some time to think on this KIP and I think I'm in > > agreement > > > > > about > > > > > > not blocking it on an official compatibility library or adding > the > > > > "ack" > > > > > > API for sink records. > > > > > > > > > > > > I only have two more thoughts: > > > > > > > > > > > > 1. Because it is possible to manipulate sink record partitions > and > > > > > offsets > > > > > > with the current API we provide for transformations, I still > > believe > > > > > > methods should be added to the SinkRecord class to expose the > > > original > > > > > > partition and offset, not just the original topic. The additional > > > > > cognitive > > > > > > burden from these two methods is going to be minimal anyways; > once > > > > users > > > > > > understand the difference between the transformed topic name and > > the > > > > > > original one, it's going to be trivial for them to understand how > > > that > > > > > same > > > > > > difference applies for partitions and offsets. It's not enough to > > > scan > > > > > the > > > > > > set of SMTs provided out of the box with Connect, ones developed > by > > > > > > Confluent, or even everything available on GitHub, since there > may > > be > > > > > > closed-source projects out there that rely on this ability. One > > > > potential > > > > > > use case could be re-routing partitions between Kafka and some > > other > > > > > > sharded system. > > > > > > > > > > > > 2. We still have to address the SinkTask::open [1] and > > > SinkTask::close > > > > > [2] > > > > > > methods. If a connector writes to the external system using the > > > > > transformed > > > > > > topic partitions it reads from Kafka, then it's possible for the > > > > > connector > > > > > > to lazily instantiate writers for topic partitions as it > encounters > > > > them > > > > > > from records provided in SinkTask::put. However, connectors also > > > need a > > > > > way > > > > > > to de-allocate those writers (and the resources used by them) > over > > > > time, > > > > > > which they can't do as easily. One possible approach here is to > > > > overload > > > > > > SinkTask::open and SinkTask::close with variants that distinguish > > > > between > > > > > > transformed and original topic partitions, and default to > invoking > > > the > > > > > > existing methods with just the original topic partitions. We > would > > > then > > > > > > have several options for how the Connect runtime can invoke these > > > > > methods, > > > > > > but in general, an approach that guarantees that tasks are > notified > > > of > > > > > > transformed topic partitions in SinkTask::open before any records > > for > > > > > that > > > > > > partition are given to it in SinkTask::put, and makes a > best-effort > > > > > attempt > > > > > > to close transformed topic partitions that appear to no longer be > > in > > > > use > > > > > > based on some eviction policy, would probably be sufficient. > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection) > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection) > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Chris > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <yash.ma...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > Thanks a lot for your inputs! > > > > > > > > > > > > > > > would provide a simple, clean interface for developers to > > > determine > > > > > > > > which features are supported by the version of the Connect > > > runtime > > > > > > > > that their plugin has been deployed onto > > > > > > > > > > > > > > I do like the idea of having such a public compatibility > library > > - > > > I > > > > > > think > > > > > > > it would remove a lot of restrictions from framework > development > > if > > > > it > > > > > > were > > > > > > > to be widely adopted. > > > > > > > > > > > > > > > we might consider adding an API to "ack" sink records > > > > > > > > > > > > > > I agree that this does seem like a more intuitive and clean > API, > > > but > > > > > I'm > > > > > > > concerned about the backward compatibility headache we'd be > > > imposing > > > > on > > > > > > all > > > > > > > existing sink connectors. Connector developers will have to > > > maintain > > > > > two > > > > > > > separate ways of doing offset management if they want to use > the > > > new > > > > > API > > > > > > > but continue supporting older versions of Kafka Connect. > > > > > > > > > > > > > > For now, I've reverted the KIP to the previous iteration which > > > > proposed > > > > > > the > > > > > > > addition of a new `SinkRecord` method to obtain the original > > Kafka > > > > > topic > > > > > > > pre-transformation. One thing to note is that I've removed the > > > method > > > > > for > > > > > > > obtaining the original Kafka partition after a cursory search > > > showed > > > > > that > > > > > > > use cases for partition modifying SMTs are primarily on the > > source > > > > > > > connector side. > > > > > > > > > > > > > > Thanks, > > > > > > > Yash > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton > > > <chr...@aiven.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I have more comments I'd like to make on this KIP when I have > > > time > > > > > > (sorry > > > > > > > > for the delay, Yash, and thanks for your patience!), but I > did > > > want > > > > > to > > > > > > > > chime in and say that I'm also not sure about overloading > > > > > > SinkTask::put. > > > > > > > I > > > > > > > > share the concerns about creating an intuitive, simple API > that > > > > Yash > > > > > > has > > > > > > > > raised. In addition, this approach doesn't seem very > > > > > sustainable--what > > > > > > do > > > > > > > > we do if we encounter another case in the future that would > > > > warrant a > > > > > > > > similar solution? We probably don't want to create three, > four, > > > > etc. > > > > > > > > overloaded variants of the method, each of which would have > to > > be > > > > > > > > implemented by connector developers who want to both leverage > > the > > > > > > latest > > > > > > > > and greatest connector APIs and maintain compatibility with > > > connect > > > > > > > > Clusters running older versions. > > > > > > > > > > > > > > > > I haven't been able to flesh this out into a design worth > > > > publishing > > > > > in > > > > > > > its > > > > > > > > own KIP yet, but one alternative I've pitched to a few people > > > with > > > > > > > > generally positive interest has been to develop an official > > > > > > compatibility > > > > > > > > library for Connect developers. This library would be > released > > as > > > > its > > > > > > own > > > > > > > > Maven artifact (separate from connect-api, connect-runtime, > > etc.) > > > > and > > > > > > > would > > > > > > > > provide a simple, clean interface for developers to determine > > > which > > > > > > > > features are supported by the version of the Connect runtime > > that > > > > > their > > > > > > > > plugin has been deployed onto. Under the hood, this library > > might > > > > use > > > > > > > > reflection to determine whether classes, methods, etc. are > > > > available, > > > > > > but > > > > > > > > the developer wouldn't have to do anything more than check > (for > > > > > > example) > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know > > at > > > > any > > > > > > > point > > > > > > > > in the lifetime of their connector/task whether that feature > is > > > > > > provided > > > > > > > by > > > > > > > > the runtime. > > > > > > > > > > > > > > > > One other high-level comment: this doesn't address every > case, > > > but > > > > we > > > > > > > might > > > > > > > > consider adding an API to "ack" sink records. This could use > > the > > > > > > > > SubmittedRecords class [1] (with some slight tweaks) under > the > > > hood > > > > > to > > > > > > > > track the latest-acked offset for each topic partition. This > > way, > > > > > > > connector > > > > > > > > developers won't be responsible for tracking offsets at all > in > > > > their > > > > > > sink > > > > > > > > tasks (eliminating issues with the accuracy of > > > post-transformation > > > > > > T/P/O > > > > > > > > sink record information), and they'll only have to notify the > > > > Connect > > > > > > > > framework when a record has been successfully dispatched to > the > > > > > > external > > > > > > > > system. This provides a cleaner, friendlier API, and also > > enables > > > > > more > > > > > > > > fine-grained metrics like the ones proposed in KIP-767 [2]. > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya < > > yash.ma...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > It's been a while for this one but the more I think about > it, > > > the > > > > > > more > > > > > > > I > > > > > > > > > feel like the current approach with a new overloaded > > > > > `SinkTask::put` > > > > > > > > method > > > > > > > > > might not be optimal. We're trying to fix a pretty corner > > case > > > > bug > > > > > > here > > > > > > > > > (usage of topic mutating SMTs with sink connectors that do > > > their > > > > > own > > > > > > > > offset > > > > > > > > > tracking) and I'm not sure that warrants a change to such a > > > > central > > > > > > > > > interface method. The new `SinkTask::put` method just seems > > > > > somewhat > > > > > > > odd > > > > > > > > > and it may not be very understandable for a new reader - I > > > don't > > > > > > think > > > > > > > > this > > > > > > > > > should be the case for a public interface method. > > Furthermore, > > > > even > > > > > > > with > > > > > > > > > elaborate documentation in place, I'm not sure if it'll be > > very > > > > > > obvious > > > > > > > > to > > > > > > > > > most people what the purpose of having these two `put` > > methods > > > is > > > > > and > > > > > > > how > > > > > > > > > they should be used by sink task implementations. What do > you > > > > > think? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya < > > > yash.ma...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable feedback so far! I've > > updated > > > > the > > > > > > KIP > > > > > > > > > based > > > > > > > > > > on our discussion above. Could you please take another > > look? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch < > > > > rha...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya < > > > > > yash.ma...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > >> > > > > > > > > > >> > Hi Randall, > > > > > > > > > >> > > > > > > > > > > >> > Thanks for elaborating. I think these are all very > good > > > > points > > > > > > > and I > > > > > > > > > see > > > > > > > > > >> > why the overloaded `SinkTask::put` method is a cleaner > > > > > solution > > > > > > > > > overall. > > > > > > > > > >> > > > > > > > > > > >> > > public void put(Collection<SinkRecord> records, > > > > > > Map<SinkRecord, > > > > > > > > > >> > TopicPartition> updatedTopicPartitions) > > > > > > > > > >> > > > > > > > > > > >> > I think this should be > > > > > > > > > >> > > > > > > > > > > >> > `public void put(Collection<SinkRecord> records, > > > > > Map<SinkRecord, > > > > > > > > > >> > TopicPartition> originalTopicPartitions)` > > > > > > > > > >> > > > > > > > > > > >> > instead because the sink records themselves have the > > > updated > > > > > > topic > > > > > > > > > >> > partitions (i.e. after all transformations have been > > > > applied) > > > > > > and > > > > > > > > the > > > > > > > > > >> KIP > > > > > > > > > >> > is proposing a way for the tasks to be able to access > > the > > > > > > original > > > > > > > > > topic > > > > > > > > > >> > partition (i.e. before transformations have been > > applied). > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> Sounds good. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > Of course, if the developer does not need separate > > > > methods, > > > > > > they > > > > > > > > can > > > > > > > > > >> > easily have the older `put` method simply delegate to > > the > > > > > newer > > > > > > > > > method. > > > > > > > > > >> > > > > > > > > > > >> > If the developer does not need separate methods (i.e. > > they > > > > > don't > > > > > > > > need > > > > > > > > > to > > > > > > > > > >> > use this new addition), they can simply continue > > > > implementing > > > > > > just > > > > > > > > the > > > > > > > > > >> > older `put` method right? > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> Correct. We should update the JavaDoc of both methods to > > > make > > > > > this > > > > > > > > > clear, > > > > > > > > > >> and in general how the two methods should are used and > > > should > > > > be > > > > > > > > > >> implemented. That can be part of the PR, and the KIP > > doesn't > > > > > need > > > > > > > this > > > > > > > > > >> wording. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > Finally, this gives us a roadmap for *eventually* > > > > > deprecating > > > > > > > the > > > > > > > > > >> older > > > > > > > > > >> > method, once the Connect runtime versions without this > > > > change > > > > > > are > > > > > > > > old > > > > > > > > > >> > enough. > > > > > > > > > >> > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate the older > > method. > > > > > Most > > > > > > > > common > > > > > > > > > >> sink > > > > > > > > > >> > connector implementations do not do their own offset > > > > tracking > > > > > > with > > > > > > > > > >> > asynchronous processing and will probably never have a > > > need > > > > > for > > > > > > > the > > > > > > > > > >> > additional parameter `Map<SinkRecord, TopicPartition> > > > > > > > > > >> > originalTopicPartitions` in the proposed new `put` > > method. > > > > > These > > > > > > > > > >> connectors > > > > > > > > > >> > can continue implementing only the existing > > > `SinkTask::put` > > > > > > method > > > > > > > > > which > > > > > > > > > >> > will be called by the default implementation of the > > newer > > > > > > > overloaded > > > > > > > > > >> `put` > > > > > > > > > >> > method. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> +1 > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > the pre-commit methods use the same > > `Map<TopicPartition, > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data structure I'm > > > > > suggesting > > > > > > > be > > > > > > > > > >> used. > > > > > > > > > >> > > > > > > > > > > >> > The data structure you're suggesting be used is a > > > > > > `Map<SinkRecord, > > > > > > > > > >> > TopicPartition>` which will map `SinkRecord` objects > to > > > the > > > > > > > original > > > > > > > > > >> topic > > > > > > > > > >> > partition of the corresponding `ConsumerRecord` right? > > To > > > > > > clarify, > > > > > > > > > this > > > > > > > > > >> is > > > > > > > > > >> > a new data structure that will need to be managed in > the > > > > > > > > > >> `WorkerSinkTask`. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> Ah, you're right. Thanks for the correction. > > > > > > > > > >> > > > > > > > > > >> Best regards, > > > > > > > > > >> Randall > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > > >> > Yash > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch < > > > > > rha...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Hi, Yash. > > > > > > > > > >> > > > > > > > > > > > >> > > I'm not sure I quite understand why it would be > > "easier" > > > > for > > > > > > > > > connector > > > > > > > > > >> > > > developers to account for implementing two > different > > > > > > > overloaded > > > > > > > > > >> `put` > > > > > > > > > >> > > > methods (assuming that they want to use this new > > > > feature) > > > > > > > versus > > > > > > > > > >> using > > > > > > > > > >> > a > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > methods? > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > Using a try-catch to try around an API method that > > > *might* > > > > > be > > > > > > > > there > > > > > > > > > >> is a > > > > > > > > > >> > > very unusual thing for most developers. > Unfortunately, > > > > we've > > > > > > had > > > > > > > > to > > > > > > > > > >> > resort > > > > > > > > > >> > > to this atypical approach with Connect in places > when > > > > there > > > > > > was > > > > > > > no > > > > > > > > > >> good > > > > > > > > > >> > > alternative. We seem to relying upon pattern because > > > it's > > > > > > easier > > > > > > > > for > > > > > > > > > >> us, > > > > > > > > > >> > > not because it offers a better experience for > > Connector > > > > > > > > developers. > > > > > > > > > >> IMO, > > > > > > > > > >> > if > > > > > > > > > >> > > there's a practical alternative that uses normal > > > > development > > > > > > > > > practices > > > > > > > > > >> > and > > > > > > > > > >> > > techniques, then we should use that alternative. > IIUC, > > > > there > > > > > > is > > > > > > > at > > > > > > > > > >> least > > > > > > > > > >> > > one practical alternative for this KIP that would > not > > > > > require > > > > > > > > > >> developers > > > > > > > > > >> > to > > > > > > > > > >> > > use the unusual try-catch to handle the case where > > > methods > > > > > are > > > > > > > not > > > > > > > > > >> found. > > > > > > > > > >> > > > > > > > > > > > >> > > I also think having two `put` methods is easier when > > the > > > > > > > Connector > > > > > > > > > >> has to > > > > > > > > > >> > > do different things for different Connect runtimes, > > too. > > > > One > > > > > > of > > > > > > > > > those > > > > > > > > > >> > > methods is called by newer Connect runtimes with the > > new > > > > > > > behavior, > > > > > > > > > and > > > > > > > > > >> > the > > > > > > > > > >> > > other method is called by an older Connect runtime. > Of > > > > > course, > > > > > > > if > > > > > > > > > the > > > > > > > > > >> > > developer does not need separate methods, they can > > > easily > > > > > have > > > > > > > the > > > > > > > > > >> older > > > > > > > > > >> > > `put` method simply delegate to the newer method. > > > > > > > > > >> > > > > > > > > > > > >> > > Finally, this gives us a roadmap for *eventually* > > > > > deprecating > > > > > > > the > > > > > > > > > >> older > > > > > > > > > >> > > method, once the Connect runtime versions without > this > > > > > change > > > > > > > are > > > > > > > > > old > > > > > > > > > >> > > enough. > > > > > > > > > >> > > > > > > > > > > > >> > > I think the advantage of going with the > > > > > > > > > >> > > > proposed approach in the KIP is that it wouldn't > > > require > > > > > > extra > > > > > > > > > >> > > book-keeping > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your > proposed > > > > > > approach) > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > The connector does have to do some of this > bookkeeping > > > in > > > > > how > > > > > > > they > > > > > > > > > >> track > > > > > > > > > >> > > the topic partition offsets used in the `preCommit`, > > and > > > > the > > > > > > > > > >> pre-commit > > > > > > > > > >> > > methods use the same `Map<TopicPartition, > > > > OffsetAndMetadata> > > > > > > > > > >> > > currentOffsets` > > > > > > > > > >> > > data structure I'm suggesting be used. > > > > > > > > > >> > > > > > > > > > > > >> > > I hope that helps. > > > > > > > > > >> > > > > > > > > > > > >> > > Best regards, > > > > > > > > > >> > > > > > > > > > > > >> > > Randall > > > > > > > > > >> > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya < > > > > > > > yash.ma...@gmail.com> > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > > > >> > > > Hi Randall, > > > > > > > > > >> > > > > > > > > > > > > >> > > > Thanks for reviewing the KIP! > > > > > > > > > >> > > > > > > > > > > > > >> > > > > That latter logic can get quite ugly. > > > > > > > > > >> > > > > > > > > > > > > >> > > > I'm not sure I quite understand why it would be > > > "easier" > > > > > for > > > > > > > > > >> connector > > > > > > > > > >> > > > developers to account for implementing two > different > > > > > > > overloaded > > > > > > > > > >> `put` > > > > > > > > > >> > > > methods (assuming that they want to use this new > > > > feature) > > > > > > > versus > > > > > > > > > >> using > > > > > > > > > >> > a > > > > > > > > > >> > > > try-catch block around `SinkRecord` access > methods? > > In > > > > > both > > > > > > > > > cases, a > > > > > > > > > >> > > > connector developer would need to write additional > > > code > > > > in > > > > > > > order > > > > > > > > > to > > > > > > > > > >> > > ensure > > > > > > > > > >> > > > that their connector continues working with older > > > > Connect > > > > > > > > > runtimes. > > > > > > > > > >> > > > Furthermore, we would probably need to carefully > > > > document > > > > > > how > > > > > > > > the > > > > > > > > > >> > > > implementation for the older `put` method should > > look > > > > like > > > > > > for > > > > > > > > > >> > connectors > > > > > > > > > >> > > > that want to use this new feature. I think the > > > advantage > > > > > of > > > > > > > > going > > > > > > > > > >> with > > > > > > > > > >> > > the > > > > > > > > > >> > > > proposed approach in the KIP is that it wouldn't > > > require > > > > > > extra > > > > > > > > > >> > > book-keeping > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your > proposed > > > > > > approach) > > > > > > > > and > > > > > > > > > >> also > > > > > > > > > >> > > the > > > > > > > > > >> > > > fact that the try-catch based logic is an already > > > > > > established > > > > > > > > > >> pattern > > > > > > > > > >> > > > through > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > > > > > > > > >> > > > and other KIPs which added methods to source/sink > > > > > > > connector/task > > > > > > > > > >> > > contexts. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Let me know if you still feel that having a new > > > > overloaded > > > > > > put > > > > > > > > > >> method > > > > > > > > > >> > is > > > > > > > > > >> > > a > > > > > > > > > >> > > > cleaner solution and I'd be happy to reconsider! > > > > > > > > > >> > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > >> > > > Yash > > > > > > > > > >> > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch < > > > > > > > > rha...@gmail.com> > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and > > > > discussion. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > The KIP includes this rejected alternative: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to pass the > > new > > > > > > > > information > > > > > > > > > >> > outside > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived class) > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > - > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Much more disruptive change without > > > considerable > > > > > pros > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > One advantage about doing this is that sink > > > connector > > > > > > > > > >> implementations > > > > > > > > > >> > > can > > > > > > > > > >> > > > > more easily implement two different "put(...)" > > > methods > > > > > to > > > > > > > > handle > > > > > > > > > >> > > running > > > > > > > > > >> > > > in > > > > > > > > > >> > > > > a variety of runtimes, without having to use > > > try-catch > > > > > > logic > > > > > > > > > >> around > > > > > > > > > >> > the > > > > > > > > > >> > > > > newer SinkRecord access methods. That latter > logic > > > can > > > > > get > > > > > > > > quite > > > > > > > > > >> > ugly. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > For example, the existing `put` method has this > > > > > signature: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > public abstract void put(Collection<SinkRecord> > > > > > records); > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > If we added an overloaded method that passed in > a > > > map > > > > of > > > > > > the > > > > > > > > old > > > > > > > > > >> > > > > topic+partition for each record (and defined the > > > > absence > > > > > > of > > > > > > > an > > > > > > > > > >> entry > > > > > > > > > >> > as > > > > > > > > > >> > > > > having an unchanged topic and partition): > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records, > > > > > > > > Map<SinkRecord, > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > > > > > >> > > > > put(records); > > > > > > > > > >> > > > > } > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > then a `SinkTask` implementation that wants to > use > > > > this > > > > > > new > > > > > > > > > >> feature > > > > > > > > > >> > > could > > > > > > > > > >> > > > > simply implement both methods: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records) > { > > > > > > > > > >> > > > > // Running in an older runtime, so no tracking > of > > > > > > > SMT-modified > > > > > > > > > >> topic > > > > > > > > > >> > > > names > > > > > > > > > >> > > > > or partitions > > > > > > > > > >> > > > > put(records, Map.of()); > > > > > > > > > >> > > > > } > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records, > > > > > > > > Map<SinkRecord, > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > > > > > >> > > > > // real logic here > > > > > > > > > >> > > > > } > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > This seems a lot easier than having to use > > try-catch > > > > > > logic, > > > > > > > > yet > > > > > > > > > >> still > > > > > > > > > >> > > > > allows sink connectors to utilize the new > > > > functionality > > > > > > and > > > > > > > > > still > > > > > > > > > >> > work > > > > > > > > > >> > > > with > > > > > > > > > >> > > > > older Connect runtimes. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > WDYT? > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Randall > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya < > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Hi all, > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > I would like to (re)start a new discussion > > thread > > > on > > > > > > > KIP-793 > > > > > > > > > >> (Kafka > > > > > > > > > >> > > > > > Connect) which proposes some additions to the > > > public > > > > > > > > > SinkRecord > > > > > > > > > >> > > > interface > > > > > > > > > >> > > > > > in order to support topic mutating SMTs for > sink > > > > > > > connectors > > > > > > > > > >> that do > > > > > > > > > >> > > > their > > > > > > > > > >> > > > > > own offset tracking. > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Links: > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > KIP: > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Older discussion thread: > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h, > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Jira: > > > > > https://issues.apache.org/jira/browse/KAFKA-13431 > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > >> > > > > > Yash > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >