Hi Randall, It's been a while for this one but the more I think about it, the more I feel like the current approach with a new overloaded `SinkTask::put` method might not be optimal. We're trying to fix a pretty corner case bug here (usage of topic mutating SMTs with sink connectors that do their own offset tracking) and I'm not sure that warrants a change to such a central interface method. The new `SinkTask::put` method just seems somewhat odd and it may not be very understandable for a new reader - I don't think this should be the case for a public interface method. Furthermore, even with elaborate documentation in place, I'm not sure if it'll be very obvious to most people what the purpose of having these two `put` methods is and how they should be used by sink task implementations. What do you think?
Thanks, Yash On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Randall, > > Thanks a lot for your valuable feedback so far! I've updated the KIP based > on our discussion above. Could you please take another look? > > Thanks, > Yash > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com> wrote: > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <yash.ma...@gmail.com> wrote: >> >> > Hi Randall, >> > >> > Thanks for elaborating. I think these are all very good points and I see >> > why the overloaded `SinkTask::put` method is a cleaner solution overall. >> > >> > > public void put(Collection<SinkRecord> records, Map<SinkRecord, >> > TopicPartition> updatedTopicPartitions) >> > >> > I think this should be >> > >> > `public void put(Collection<SinkRecord> records, Map<SinkRecord, >> > TopicPartition> originalTopicPartitions)` >> > >> > instead because the sink records themselves have the updated topic >> > partitions (i.e. after all transformations have been applied) and the >> KIP >> > is proposing a way for the tasks to be able to access the original topic >> > partition (i.e. before transformations have been applied). >> > >> >> Sounds good. >> >> >> > >> > > Of course, if the developer does not need separate methods, they can >> > easily have the older `put` method simply delegate to the newer method. >> > >> > If the developer does not need separate methods (i.e. they don't need to >> > use this new addition), they can simply continue implementing just the >> > older `put` method right? >> > >> >> Correct. We should update the JavaDoc of both methods to make this clear, >> and in general how the two methods should are used and should be >> implemented. That can be part of the PR, and the KIP doesn't need this >> wording. >> >> > >> > > Finally, this gives us a roadmap for *eventually* deprecating the >> older >> > method, once the Connect runtime versions without this change are old >> > enough. >> > >> > I'm not sure we'd ever want to deprecate the older method. Most common >> sink >> > connector implementations do not do their own offset tracking with >> > asynchronous processing and will probably never have a need for the >> > additional parameter `Map<SinkRecord, TopicPartition> >> > originalTopicPartitions` in the proposed new `put` method. These >> connectors >> > can continue implementing only the existing `SinkTask::put` method which >> > will be called by the default implementation of the newer overloaded >> `put` >> > method. >> > >> >> +1 >> >> >> > >> > > the pre-commit methods use the same `Map<TopicPartition, >> > OffsetAndMetadata> currentOffsets` data structure I'm suggesting be >> used. >> > >> > The data structure you're suggesting be used is a `Map<SinkRecord, >> > TopicPartition>` which will map `SinkRecord` objects to the original >> topic >> > partition of the corresponding `ConsumerRecord` right? To clarify, this >> is >> > a new data structure that will need to be managed in the >> `WorkerSinkTask`. >> > >> >> Ah, you're right. Thanks for the correction. >> >> Best regards, >> Randall >> >> >> > Thanks, >> > Yash >> >> >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <rha...@gmail.com> wrote: >> > >> > > Hi, Yash. >> > > >> > > I'm not sure I quite understand why it would be "easier" for connector >> > > > developers to account for implementing two different overloaded >> `put` >> > > > methods (assuming that they want to use this new feature) versus >> using >> > a >> > > > try-catch block around `SinkRecord` access methods? >> > > >> > > >> > > Using a try-catch to try around an API method that *might* be there >> is a >> > > very unusual thing for most developers. Unfortunately, we've had to >> > resort >> > > to this atypical approach with Connect in places when there was no >> good >> > > alternative. We seem to relying upon pattern because it's easier for >> us, >> > > not because it offers a better experience for Connector developers. >> IMO, >> > if >> > > there's a practical alternative that uses normal development practices >> > and >> > > techniques, then we should use that alternative. IIUC, there is at >> least >> > > one practical alternative for this KIP that would not require >> developers >> > to >> > > use the unusual try-catch to handle the case where methods are not >> found. >> > > >> > > I also think having two `put` methods is easier when the Connector >> has to >> > > do different things for different Connect runtimes, too. One of those >> > > methods is called by newer Connect runtimes with the new behavior, and >> > the >> > > other method is called by an older Connect runtime. Of course, if the >> > > developer does not need separate methods, they can easily have the >> older >> > > `put` method simply delegate to the newer method. >> > > >> > > Finally, this gives us a roadmap for *eventually* deprecating the >> older >> > > method, once the Connect runtime versions without this change are old >> > > enough. >> > > >> > > I think the advantage of going with the >> > > > proposed approach in the KIP is that it wouldn't require extra >> > > book-keeping >> > > > (the Map<SinkRecord, >> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach) >> > > > >> > > >> > > The connector does have to do some of this bookkeeping in how they >> track >> > > the topic partition offsets used in the `preCommit`, and the >> pre-commit >> > > methods use the same `Map<TopicPartition, OffsetAndMetadata> >> > > currentOffsets` >> > > data structure I'm suggesting be used. >> > > >> > > I hope that helps. >> > > >> > > Best regards, >> > > >> > > Randall >> > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <yash.ma...@gmail.com> >> wrote: >> > > >> > > > Hi Randall, >> > > > >> > > > Thanks for reviewing the KIP! >> > > > >> > > > > That latter logic can get quite ugly. >> > > > >> > > > I'm not sure I quite understand why it would be "easier" for >> connector >> > > > developers to account for implementing two different overloaded >> `put` >> > > > methods (assuming that they want to use this new feature) versus >> using >> > a >> > > > try-catch block around `SinkRecord` access methods? In both cases, a >> > > > connector developer would need to write additional code in order to >> > > ensure >> > > > that their connector continues working with older Connect runtimes. >> > > > Furthermore, we would probably need to carefully document how the >> > > > implementation for the older `put` method should look like for >> > connectors >> > > > that want to use this new feature. I think the advantage of going >> with >> > > the >> > > > proposed approach in the KIP is that it wouldn't require extra >> > > book-keeping >> > > > (the Map<SinkRecord, >> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach) and >> also >> > > the >> > > > fact that the try-catch based logic is an already established >> pattern >> > > > through >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors >> > > > and other KIPs which added methods to source/sink connector/task >> > > contexts. >> > > > >> > > > Let me know if you still feel that having a new overloaded put >> method >> > is >> > > a >> > > > cleaner solution and I'd be happy to reconsider! >> > > > >> > > > Thanks, >> > > > Yash >> > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <rha...@gmail.com> >> > wrote: >> > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and discussion. >> > > > > >> > > > > The KIP includes this rejected alternative: >> > > > > >> > > > > > 4. Update SinkTask.put in any way to pass the new information >> > outside >> > > > > > SinkRecord (e.g. a Map or a derived class) >> > > > > > >> > > > > > - >> > > > > > >> > > > > > Much more disruptive change without considerable pros >> > > > > > >> > > > > > >> > > > > One advantage about doing this is that sink connector >> implementations >> > > can >> > > > > more easily implement two different "put(...)" methods to handle >> > > running >> > > > in >> > > > > a variety of runtimes, without having to use try-catch logic >> around >> > the >> > > > > newer SinkRecord access methods. That latter logic can get quite >> > ugly. >> > > > > >> > > > > For example, the existing `put` method has this signature: >> > > > > >> > > > > public abstract void put(Collection<SinkRecord> records); >> > > > > >> > > > > If we added an overloaded method that passed in a map of the old >> > > > > topic+partition for each record (and defined the absence of an >> entry >> > as >> > > > > having an unchanged topic and partition): >> > > > > >> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord, >> > > > > TopicPartition> updatedTopicPartitions) { >> > > > > put(records); >> > > > > } >> > > > > >> > > > > then a `SinkTask` implementation that wants to use this new >> feature >> > > could >> > > > > simply implement both methods: >> > > > > >> > > > > public void put(Collection<SinkRecord> records) { >> > > > > // Running in an older runtime, so no tracking of SMT-modified >> topic >> > > > names >> > > > > or partitions >> > > > > put(records, Map.of()); >> > > > > } >> > > > > >> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord, >> > > > > TopicPartition> updatedTopicPartitions) { >> > > > > // real logic here >> > > > > } >> > > > > >> > > > > This seems a lot easier than having to use try-catch logic, yet >> still >> > > > > allows sink connectors to utilize the new functionality and still >> > work >> > > > with >> > > > > older Connect runtimes. >> > > > > >> > > > > WDYT? >> > > > > >> > > > > Randall >> > > > > >> > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <yash.ma...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > I would like to (re)start a new discussion thread on KIP-793 >> (Kafka >> > > > > > Connect) which proposes some additions to the public SinkRecord >> > > > interface >> > > > > > in order to support topic mutating SMTs for sink connectors >> that do >> > > > their >> > > > > > own offset tracking. >> > > > > > >> > > > > > Links: >> > > > > > >> > > > > > KIP: >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 >> > > > > > >> > > > > > Older discussion thread: >> > > > > > >> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h, >> > > > > > >> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj >> > > > > > >> > > > > > Jira: https://issues.apache.org/jira/browse/KAFKA-13431 >> > > > > > >> > > > > > >> > > > > > Thanks, >> > > > > > Yash >> > > > > > >> > > > > >> > > > >> > > >> > >> >