Hi Yash, > So it looks like with the current state of affairs, sink tasks that only instantiate writers in the SinkTask::open method (and don't do the lazy instantiation in SinkTask::put that you mentioned) might fail when used with topic/partition mutating SMTs even if they don't do any asynchronous processing?
Yep, exactly 👍 > What do you think about retaining just the existing methods but changing when they're called in the Connect runtime? For instance, instead of calling SinkTask::open after partition assignment post a consumer group rebalance, we could cache the currently "seen" topic partitions (post transformation) and before each call to SinkTask::put check whether there's any new "unseen" topic partitions, and if so call SinkTask::open (and also update the cache of course). IMO the issue here is that it's a drastic change in behavior to start invoking SinkTask::open and SinkTask::close with post-transform topic partitions instead of pre-transform, especially if connectors are intentionally designed around original topic partitions instead of transformed ones. I think we have to provide connector developers with some way to differentiate between the two, but maybe there's a way to do this that I haven't thought of yet. Interested to hear your thoughts. Either way, I'm glad that the general idea of a cache and eviction policy for SinkTask::close seem reasonable; if we decide to go this route, it might make sense for the KIP to include an outline of one or more high-level strategies we might take, but without promising any particular behavior beyond occasionally calling SinkTask::close for post-transform topic partitions. I'm hoping that this logic can stay internal, and by not painting ourselves into a corner with the KIP, we give ourselves leeway to tweak it in the future if necessary without filing another KIP or introducing a pluggable interface. Cheers, Chris On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Chris, > > Thanks for the feedback. > > 1) That's a fair point; while I did scan everything publicly available on > GitHub, you're right in that it won't cover all possible SMTs that are out > there. Thanks for the example use-case as well, I've updated the KIP to add > the two new proposed methods. > > 2) So it looks like with the current state of affairs, sink tasks that only > instantiate writers in the SinkTask::open method (and don't do the lazy > instantiation in SinkTask::put that you mentioned) might fail when used > with topic/partition mutating SMTs even if they don't do any asynchronous > processing? Since they could encounter records in SinkTask::put with > topics/partitions that they might not have created writers for. Thanks for > pointing this out, it's definitely another incompatibility that needs to be > called out and fixed. The overloaded method approach is interesting, but > comes with the caveat of yet more new methods that will need to be > implemented by existing connectors if they want to make use of this new > functionality. What do you think about retaining just the existing methods > but changing when they're called in the Connect runtime? For instance, > instead of calling SinkTask::open after partition assignment post a > consumer group rebalance, we could cache the currently "seen" topic > partitions (post transformation) and before each call to SinkTask::put > check whether there's any new "unseen" topic partitions, and if so call > SinkTask::open (and also update the cache of course). I don't think this > would break the existing contract with sink tasks where SinkTask::open is > expected to be called for a topic partition before any records from the > topic partition are sent via SinkTask::put? The SinkTask::close case is a > lot trickier however, and would require some sort of cache eviction policy > that would be deemed appropriate as you pointed out too. > > Thanks, > Yash > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Yash, > > > > I've had some time to think on this KIP and I think I'm in agreement > about > > not blocking it on an official compatibility library or adding the "ack" > > API for sink records. > > > > I only have two more thoughts: > > > > 1. Because it is possible to manipulate sink record partitions and > offsets > > with the current API we provide for transformations, I still believe > > methods should be added to the SinkRecord class to expose the original > > partition and offset, not just the original topic. The additional > cognitive > > burden from these two methods is going to be minimal anyways; once users > > understand the difference between the transformed topic name and the > > original one, it's going to be trivial for them to understand how that > same > > difference applies for partitions and offsets. It's not enough to scan > the > > set of SMTs provided out of the box with Connect, ones developed by > > Confluent, or even everything available on GitHub, since there may be > > closed-source projects out there that rely on this ability. One potential > > use case could be re-routing partitions between Kafka and some other > > sharded system. > > > > 2. We still have to address the SinkTask::open [1] and SinkTask::close > [2] > > methods. If a connector writes to the external system using the > transformed > > topic partitions it reads from Kafka, then it's possible for the > connector > > to lazily instantiate writers for topic partitions as it encounters them > > from records provided in SinkTask::put. However, connectors also need a > way > > to de-allocate those writers (and the resources used by them) over time, > > which they can't do as easily. One possible approach here is to overload > > SinkTask::open and SinkTask::close with variants that distinguish between > > transformed and original topic partitions, and default to invoking the > > existing methods with just the original topic partitions. We would then > > have several options for how the Connect runtime can invoke these > methods, > > but in general, an approach that guarantees that tasks are notified of > > transformed topic partitions in SinkTask::open before any records for > that > > partition are given to it in SinkTask::put, and makes a best-effort > attempt > > to close transformed topic partitions that appear to no longer be in use > > based on some eviction policy, would probably be sufficient. > > > > [1] - > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection) > > [2] - > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection) > > > > Cheers, > > > > Chris > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <yash.ma...@gmail.com> wrote: > > > > > Hi Chris, > > > > > > Thanks a lot for your inputs! > > > > > > > would provide a simple, clean interface for developers to determine > > > > which features are supported by the version of the Connect runtime > > > > that their plugin has been deployed onto > > > > > > I do like the idea of having such a public compatibility library - I > > think > > > it would remove a lot of restrictions from framework development if it > > were > > > to be widely adopted. > > > > > > > we might consider adding an API to "ack" sink records > > > > > > I agree that this does seem like a more intuitive and clean API, but > I'm > > > concerned about the backward compatibility headache we'd be imposing on > > all > > > existing sink connectors. Connector developers will have to maintain > two > > > separate ways of doing offset management if they want to use the new > API > > > but continue supporting older versions of Kafka Connect. > > > > > > For now, I've reverted the KIP to the previous iteration which proposed > > the > > > addition of a new `SinkRecord` method to obtain the original Kafka > topic > > > pre-transformation. One thing to note is that I've removed the method > for > > > obtaining the original Kafka partition after a cursory search showed > that > > > use cases for partition modifying SMTs are primarily on the source > > > connector side. > > > > > > Thanks, > > > Yash > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton <chr...@aiven.io.invalid> > > > wrote: > > > > > > > Hi all, > > > > > > > > I have more comments I'd like to make on this KIP when I have time > > (sorry > > > > for the delay, Yash, and thanks for your patience!), but I did want > to > > > > chime in and say that I'm also not sure about overloading > > SinkTask::put. > > > I > > > > share the concerns about creating an intuitive, simple API that Yash > > has > > > > raised. In addition, this approach doesn't seem very > sustainable--what > > do > > > > we do if we encounter another case in the future that would warrant a > > > > similar solution? We probably don't want to create three, four, etc. > > > > overloaded variants of the method, each of which would have to be > > > > implemented by connector developers who want to both leverage the > > latest > > > > and greatest connector APIs and maintain compatibility with connect > > > > Clusters running older versions. > > > > > > > > I haven't been able to flesh this out into a design worth publishing > in > > > its > > > > own KIP yet, but one alternative I've pitched to a few people with > > > > generally positive interest has been to develop an official > > compatibility > > > > library for Connect developers. This library would be released as its > > own > > > > Maven artifact (separate from connect-api, connect-runtime, etc.) and > > > would > > > > provide a simple, clean interface for developers to determine which > > > > features are supported by the version of the Connect runtime that > their > > > > plugin has been deployed onto. Under the hood, this library might use > > > > reflection to determine whether classes, methods, etc. are available, > > but > > > > the developer wouldn't have to do anything more than check (for > > example) > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know at any > > > point > > > > in the lifetime of their connector/task whether that feature is > > provided > > > by > > > > the runtime. > > > > > > > > One other high-level comment: this doesn't address every case, but we > > > might > > > > consider adding an API to "ack" sink records. This could use the > > > > SubmittedRecords class [1] (with some slight tweaks) under the hood > to > > > > track the latest-acked offset for each topic partition. This way, > > > connector > > > > developers won't be responsible for tracking offsets at all in their > > sink > > > > tasks (eliminating issues with the accuracy of post-transformation > > T/P/O > > > > sink record information), and they'll only have to notify the Connect > > > > framework when a record has been successfully dispatched to the > > external > > > > system. This provides a cleaner, friendlier API, and also enables > more > > > > fine-grained metrics like the ones proposed in KIP-767 [2]. > > > > > > > > [1] - > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > > > [2] - > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <yash.ma...@gmail.com> > > wrote: > > > > > > > > > Hi Randall, > > > > > > > > > > It's been a while for this one but the more I think about it, the > > more > > > I > > > > > feel like the current approach with a new overloaded > `SinkTask::put` > > > > method > > > > > might not be optimal. We're trying to fix a pretty corner case bug > > here > > > > > (usage of topic mutating SMTs with sink connectors that do their > own > > > > offset > > > > > tracking) and I'm not sure that warrants a change to such a central > > > > > interface method. The new `SinkTask::put` method just seems > somewhat > > > odd > > > > > and it may not be very understandable for a new reader - I don't > > think > > > > this > > > > > should be the case for a public interface method. Furthermore, even > > > with > > > > > elaborate documentation in place, I'm not sure if it'll be very > > obvious > > > > to > > > > > most people what the purpose of having these two `put` methods is > and > > > how > > > > > they should be used by sink task implementations. What do you > think? > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com> > > > wrote: > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > Thanks a lot for your valuable feedback so far! I've updated the > > KIP > > > > > based > > > > > > on our discussion above. Could you please take another look? > > > > > > > > > > > > Thanks, > > > > > > Yash > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com> > > > > wrote: > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya < > yash.ma...@gmail.com> > > > > > wrote: > > > > > >> > > > > > >> > Hi Randall, > > > > > >> > > > > > > >> > Thanks for elaborating. I think these are all very good points > > > and I > > > > > see > > > > > >> > why the overloaded `SinkTask::put` method is a cleaner > solution > > > > > overall. > > > > > >> > > > > > > >> > > public void put(Collection<SinkRecord> records, > > Map<SinkRecord, > > > > > >> > TopicPartition> updatedTopicPartitions) > > > > > >> > > > > > > >> > I think this should be > > > > > >> > > > > > > >> > `public void put(Collection<SinkRecord> records, > Map<SinkRecord, > > > > > >> > TopicPartition> originalTopicPartitions)` > > > > > >> > > > > > > >> > instead because the sink records themselves have the updated > > topic > > > > > >> > partitions (i.e. after all transformations have been applied) > > and > > > > the > > > > > >> KIP > > > > > >> > is proposing a way for the tasks to be able to access the > > original > > > > > topic > > > > > >> > partition (i.e. before transformations have been applied). > > > > > >> > > > > > > >> > > > > > >> Sounds good. > > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > Of course, if the developer does not need separate methods, > > they > > > > can > > > > > >> > easily have the older `put` method simply delegate to the > newer > > > > > method. > > > > > >> > > > > > > >> > If the developer does not need separate methods (i.e. they > don't > > > > need > > > > > to > > > > > >> > use this new addition), they can simply continue implementing > > just > > > > the > > > > > >> > older `put` method right? > > > > > >> > > > > > > >> > > > > > >> Correct. We should update the JavaDoc of both methods to make > this > > > > > clear, > > > > > >> and in general how the two methods should are used and should be > > > > > >> implemented. That can be part of the PR, and the KIP doesn't > need > > > this > > > > > >> wording. > > > > > >> > > > > > >> > > > > > > >> > > Finally, this gives us a roadmap for *eventually* > deprecating > > > the > > > > > >> older > > > > > >> > method, once the Connect runtime versions without this change > > are > > > > old > > > > > >> > enough. > > > > > >> > > > > > > >> > I'm not sure we'd ever want to deprecate the older method. > Most > > > > common > > > > > >> sink > > > > > >> > connector implementations do not do their own offset tracking > > with > > > > > >> > asynchronous processing and will probably never have a need > for > > > the > > > > > >> > additional parameter `Map<SinkRecord, TopicPartition> > > > > > >> > originalTopicPartitions` in the proposed new `put` method. > These > > > > > >> connectors > > > > > >> > can continue implementing only the existing `SinkTask::put` > > method > > > > > which > > > > > >> > will be called by the default implementation of the newer > > > overloaded > > > > > >> `put` > > > > > >> > method. > > > > > >> > > > > > > >> > > > > > >> +1 > > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > the pre-commit methods use the same `Map<TopicPartition, > > > > > >> > OffsetAndMetadata> currentOffsets` data structure I'm > suggesting > > > be > > > > > >> used. > > > > > >> > > > > > > >> > The data structure you're suggesting be used is a > > `Map<SinkRecord, > > > > > >> > TopicPartition>` which will map `SinkRecord` objects to the > > > original > > > > > >> topic > > > > > >> > partition of the corresponding `ConsumerRecord` right? To > > clarify, > > > > > this > > > > > >> is > > > > > >> > a new data structure that will need to be managed in the > > > > > >> `WorkerSinkTask`. > > > > > >> > > > > > > >> > > > > > >> Ah, you're right. Thanks for the correction. > > > > > >> > > > > > >> Best regards, > > > > > >> Randall > > > > > >> > > > > > >> > > > > > >> > Thanks, > > > > > >> > Yash > > > > > >> > > > > > >> > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch < > rha...@gmail.com> > > > > > wrote: > > > > > >> > > > > > > >> > > Hi, Yash. > > > > > >> > > > > > > > >> > > I'm not sure I quite understand why it would be "easier" for > > > > > connector > > > > > >> > > > developers to account for implementing two different > > > overloaded > > > > > >> `put` > > > > > >> > > > methods (assuming that they want to use this new feature) > > > versus > > > > > >> using > > > > > >> > a > > > > > >> > > > try-catch block around `SinkRecord` access methods? > > > > > >> > > > > > > > >> > > > > > > > >> > > Using a try-catch to try around an API method that *might* > be > > > > there > > > > > >> is a > > > > > >> > > very unusual thing for most developers. Unfortunately, we've > > had > > > > to > > > > > >> > resort > > > > > >> > > to this atypical approach with Connect in places when there > > was > > > no > > > > > >> good > > > > > >> > > alternative. We seem to relying upon pattern because it's > > easier > > > > for > > > > > >> us, > > > > > >> > > not because it offers a better experience for Connector > > > > developers. > > > > > >> IMO, > > > > > >> > if > > > > > >> > > there's a practical alternative that uses normal development > > > > > practices > > > > > >> > and > > > > > >> > > techniques, then we should use that alternative. IIUC, there > > is > > > at > > > > > >> least > > > > > >> > > one practical alternative for this KIP that would not > require > > > > > >> developers > > > > > >> > to > > > > > >> > > use the unusual try-catch to handle the case where methods > are > > > not > > > > > >> found. > > > > > >> > > > > > > > >> > > I also think having two `put` methods is easier when the > > > Connector > > > > > >> has to > > > > > >> > > do different things for different Connect runtimes, too. One > > of > > > > > those > > > > > >> > > methods is called by newer Connect runtimes with the new > > > behavior, > > > > > and > > > > > >> > the > > > > > >> > > other method is called by an older Connect runtime. Of > course, > > > if > > > > > the > > > > > >> > > developer does not need separate methods, they can easily > have > > > the > > > > > >> older > > > > > >> > > `put` method simply delegate to the newer method. > > > > > >> > > > > > > > >> > > Finally, this gives us a roadmap for *eventually* > deprecating > > > the > > > > > >> older > > > > > >> > > method, once the Connect runtime versions without this > change > > > are > > > > > old > > > > > >> > > enough. > > > > > >> > > > > > > > >> > > I think the advantage of going with the > > > > > >> > > > proposed approach in the KIP is that it wouldn't require > > extra > > > > > >> > > book-keeping > > > > > >> > > > (the Map<SinkRecord, > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed > > approach) > > > > > >> > > > > > > > > >> > > > > > > > >> > > The connector does have to do some of this bookkeeping in > how > > > they > > > > > >> track > > > > > >> > > the topic partition offsets used in the `preCommit`, and the > > > > > >> pre-commit > > > > > >> > > methods use the same `Map<TopicPartition, OffsetAndMetadata> > > > > > >> > > currentOffsets` > > > > > >> > > data structure I'm suggesting be used. > > > > > >> > > > > > > > >> > > I hope that helps. > > > > > >> > > > > > > > >> > > Best regards, > > > > > >> > > > > > > > >> > > Randall > > > > > >> > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya < > > > yash.ma...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > > > >> > > > Hi Randall, > > > > > >> > > > > > > > > >> > > > Thanks for reviewing the KIP! > > > > > >> > > > > > > > > >> > > > > That latter logic can get quite ugly. > > > > > >> > > > > > > > > >> > > > I'm not sure I quite understand why it would be "easier" > for > > > > > >> connector > > > > > >> > > > developers to account for implementing two different > > > overloaded > > > > > >> `put` > > > > > >> > > > methods (assuming that they want to use this new feature) > > > versus > > > > > >> using > > > > > >> > a > > > > > >> > > > try-catch block around `SinkRecord` access methods? In > both > > > > > cases, a > > > > > >> > > > connector developer would need to write additional code in > > > order > > > > > to > > > > > >> > > ensure > > > > > >> > > > that their connector continues working with older Connect > > > > > runtimes. > > > > > >> > > > Furthermore, we would probably need to carefully document > > how > > > > the > > > > > >> > > > implementation for the older `put` method should look like > > for > > > > > >> > connectors > > > > > >> > > > that want to use this new feature. I think the advantage > of > > > > going > > > > > >> with > > > > > >> > > the > > > > > >> > > > proposed approach in the KIP is that it wouldn't require > > extra > > > > > >> > > book-keeping > > > > > >> > > > (the Map<SinkRecord, > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed > > approach) > > > > and > > > > > >> also > > > > > >> > > the > > > > > >> > > > fact that the try-catch based logic is an already > > established > > > > > >> pattern > > > > > >> > > > through > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > > > > >> > > > and other KIPs which added methods to source/sink > > > connector/task > > > > > >> > > contexts. > > > > > >> > > > > > > > > >> > > > Let me know if you still feel that having a new overloaded > > put > > > > > >> method > > > > > >> > is > > > > > >> > > a > > > > > >> > > > cleaner solution and I'd be happy to reconsider! > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Yash > > > > > >> > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch < > > > > rha...@gmail.com> > > > > > >> > wrote: > > > > > >> > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and discussion. > > > > > >> > > > > > > > > > >> > > > > The KIP includes this rejected alternative: > > > > > >> > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to pass the new > > > > information > > > > > >> > outside > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived class) > > > > > >> > > > > > > > > > > >> > > > > > - > > > > > >> > > > > > > > > > > >> > > > > > Much more disruptive change without considerable > pros > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > One advantage about doing this is that sink connector > > > > > >> implementations > > > > > >> > > can > > > > > >> > > > > more easily implement two different "put(...)" methods > to > > > > handle > > > > > >> > > running > > > > > >> > > > in > > > > > >> > > > > a variety of runtimes, without having to use try-catch > > logic > > > > > >> around > > > > > >> > the > > > > > >> > > > > newer SinkRecord access methods. That latter logic can > get > > > > quite > > > > > >> > ugly. > > > > > >> > > > > > > > > > >> > > > > For example, the existing `put` method has this > signature: > > > > > >> > > > > > > > > > >> > > > > public abstract void put(Collection<SinkRecord> > records); > > > > > >> > > > > > > > > > >> > > > > If we added an overloaded method that passed in a map of > > the > > > > old > > > > > >> > > > > topic+partition for each record (and defined the absence > > of > > > an > > > > > >> entry > > > > > >> > as > > > > > >> > > > > having an unchanged topic and partition): > > > > > >> > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records, > > > > Map<SinkRecord, > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > >> > > > > put(records); > > > > > >> > > > > } > > > > > >> > > > > > > > > > >> > > > > then a `SinkTask` implementation that wants to use this > > new > > > > > >> feature > > > > > >> > > could > > > > > >> > > > > simply implement both methods: > > > > > >> > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records) { > > > > > >> > > > > // Running in an older runtime, so no tracking of > > > SMT-modified > > > > > >> topic > > > > > >> > > > names > > > > > >> > > > > or partitions > > > > > >> > > > > put(records, Map.of()); > > > > > >> > > > > } > > > > > >> > > > > > > > > > >> > > > > public void put(Collection<SinkRecord> records, > > > > Map<SinkRecord, > > > > > >> > > > > TopicPartition> updatedTopicPartitions) { > > > > > >> > > > > // real logic here > > > > > >> > > > > } > > > > > >> > > > > > > > > > >> > > > > This seems a lot easier than having to use try-catch > > logic, > > > > yet > > > > > >> still > > > > > >> > > > > allows sink connectors to utilize the new functionality > > and > > > > > still > > > > > >> > work > > > > > >> > > > with > > > > > >> > > > > older Connect runtimes. > > > > > >> > > > > > > > > > >> > > > > WDYT? > > > > > >> > > > > > > > > > >> > > > > Randall > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya < > > > > yash.ma...@gmail.com > > > > > > > > > > > >> > > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi all, > > > > > >> > > > > > > > > > > >> > > > > > I would like to (re)start a new discussion thread on > > > KIP-793 > > > > > >> (Kafka > > > > > >> > > > > > Connect) which proposes some additions to the public > > > > > SinkRecord > > > > > >> > > > interface > > > > > >> > > > > > in order to support topic mutating SMTs for sink > > > connectors > > > > > >> that do > > > > > >> > > > their > > > > > >> > > > > > own offset tracking. > > > > > >> > > > > > > > > > > >> > > > > > Links: > > > > > >> > > > > > > > > > > >> > > > > > KIP: > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 > > > > > >> > > > > > > > > > > >> > > > > > Older discussion thread: > > > > > >> > > > > > > > > > > >> > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h, > > > > > >> > > > > > > > > > > >> > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj > > > > > >> > > > > > > > > > > >> > > > > > Jira: > https://issues.apache.org/jira/browse/KAFKA-13431 > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > Thanks, > > > > > >> > > > > > Yash > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >