Hi Yash,

Thanks for your continued work on this tricky feature. I have no further
comments or suggestions on the KIP and am ready to vote in favor of it.

That said, I did want to quickly respond to this comment:

> On a side note, this also means that the per sink record ack API
that was proposed earlier wouldn't really work for this case since Kafka
consumers themselves don't support per message acknowledgement semantics
(and any sort of manual book-keeping based on offset linearity in a topic
partition would be affected by things like log compaction, control records
for transactional use cases etc.) right?

I believe we could still use the SubmittedRecords class [1] (with some
small tweaks) to track ack'd messages and the latest-committable offsets
per topic partition, without relying on assumptions about offsets for
consecutive records consumed from Kafka always differing by one. But at
this point I think that, although this approach does come with the
advantage of also enabling fine-grained metrics on record delivery to the
sink system, it's not worth the tradeoff in intuition since it's less clear
why users should prefer that API instead of using SinkTask::preCommit.

[1] -
https://github.com/apache/kafka/blob/12be344fdd3b20f338ccab87933b89049ce202a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java

Cheers,

Chris

On Wed, Jun 21, 2023 at 9:46 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
>
> Firstly, thanks for sharing your detailed thoughts on this thorny issue!
> Point taken on Kafka Connect being a brownfield project and I guess we
> might just need to trade off elegant / "clean" interfaces for fixing this
> gap in functionality. Also, thanks for calling out all the existing
> cross-plugin interactions and also the fact that connectors are not and
> should not be developed in silos ignoring the rest of the ecosystem. That
> said, here are my thoughts:
>
> > we could replace these methods with headers that the
> > Connect runtime automatically injects into records directly
> > before dispatching them to SinkTask::put.
>
> Hm, that's an interesting idea to get around the need for connectors to
> handle potential 'NoSuchMethodError's in calls to
> SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset.
> However, I'm inclined to agree that retrieving these values from the record
> headers seems even less intuitive and I'm okay with adding this to the
> rejected alternatives list.
>
> > we can consider eliminating the overridden
> > SinkTask::open/close methods
>
> I tried to further explore the idea of keeping just the existing
> SinkTask::open / SinkTask::close methods but only calling them with
> post-transform topic partitions and ended up coming to the same conclusion
> that you did earlier in this thread :)
>
> The overloaded SinkTask::open / SinkTask::close are currently the biggest
> sticking points with the latest iteration of this KIP and I'd prefer this
> elimination for now. The primary reasoning is that the information from
> open / close on pre-transform topic partitions can be combined with the per
> record information of both pre-transform and post-transform topic
> partitions to handle most practical use cases without significantly
> muddying the sink connector related public interfaces. The argument that
> this makes it harder for sink connectors to deal with post-transform topic
> partitions (i.e. in terms of grouping together or batching records for
> writing to the sink system) can be countered with the fact that it'll be
> similarly challenging even with the overloaded method approach of calling
> open / close with both pre-transform and post-transform topic partitions
> since the batching would be done on post-transform topic partitions whereas
> offset tracking and reporting for commits would be done on pre-transform
> topic partitions (and the two won't necessarily serially advance in
> lockstep). On a side note, this also means that the per sink record ack API
> that was proposed earlier wouldn't really work for this case since Kafka
> consumers themselves don't support per message acknowledgement semantics
> (and any sort of manual book-keeping based on offset linearity in a topic
> partition would be affected by things like log compaction, control records
> for transactional use cases etc.) right? Overall, I think that the only
> benefit of the overloaded open / close methods approach is that the
> framework can enable the eventual closure of any post-transform topic
> partition based writers created by sink tasks using the heuristics we
> discussed earlier (via a cache with a time-based eviction policy) which
> doesn't seem worth it at this point.
>
> Thanks,
> Yash
>
> On Mon, May 22, 2023 at 7:30 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > I've been following the discussion and have some thoughts. Ultimately I'm
> > still in favor of this KIP and would hate to see it go dormant, though we
> > may end up settling for a less-invasive option.
> >
> >
> > On the topic of abstraction and inter-plugin interactions:
> >
> > First, there already are instances of cross-plugin interactions. Logical
> > type handling is probably the biggest example: a source connector embeds
> > metadata in the schema for record keys/values it emits that notifies
> > downstream converters about how to handle them. We provide support for
> some
> > logical types in Connect out of the box, but there's nothing stopping
> > connector and converter developers from implementing their own logical
> type
> > support using the exact same mechanism and different logical type names,
> > which is already done by Debezium, to name one example.
> >
> > Second, although it's been a goal of Connect to abstract away parts of
> > building a data pipeline so that, e.g., connector developers don't have
> to
> > be concerned with converters or consumers, in reality, this layer of
> > abstraction has already been eroded. The example that most-readily comes
> to
> > mind is how source tasks are notified of the offsets of records that
> > they've emitted after they've been published to Kafka via
> > SourceTask::commitRecord [1].
> >
> > But, more importantly, it's unlikely that connectors are being developed
> in
> > complete isolation. Nobody's going to implement the SinkConnector /
> > SinkTask interfaces and then throw that code off to someone else to
> figure
> > out all the details of deployment, configuration, testing, etc.
> Developers
> > will probably have to be aware of at least the converter interface, some
> of
> > the available implementations of it, and some details of Kafka clients
> > (e.g., consumer groups for sink connectors). And this isn't a bad
> > thing--it's unlikely that someone will write a Kafka connector without
> > having or benefitting from some understanding of Kafka and the steps of
> the
> > data pipeline that it will be a part of.
> >
> > Bringing this to the practical topic of discussion--transformations--I
> > think it's actually in everyone's best interests for connector developers
> > to be aware of transformations. This isn't just because of the specific
> > problem that the KIP is trying to address. It's because there's plenty of
> > logic that can be implemented via SMT that a naive connector developer
> will
> > think that they have to implement on their own, which will ultimately
> lead
> > to a sub-par experience for people who end up using those connectors due
> to
> > inconsistent semantics (especially lack of predicates), inconsistent
> > configuration syntax, increased chances for bugs, and FUD ("why wasn't
> this
> > implemented as an SMT?").
> >
> > Finally, although preserving clean, composable interfaces that can be
> > understood in isolation is a great principle to start with, we are now in
> > what Anna McDonald recently referred to as "brownfield" space for
> Connect.
> > We can't go back in time and redesign the SMT interface/contracts to make
> > things cleaner. And I don't think it's fair to anyone to suddenly drop
> > support for SMTs that mutate t/p/o information for sink records,
> especially
> > since these can be used gainfully with plenty of existing sink
> connectors.
> >
> > Ultimately I still think the path forward that's best for the users is to
> > make the impossible possible by addressing this long-standing API gap in
> > Connect. Yes, it adds to the cognitive burden for connector developers,
> but
> > if they can tolerate it, the end result is better for everyone involved,
> > and if they can't, it's likely that the end result will be a preservation
> > of existing behavior, which leaves us no worse than before.
> >
> >
> > With all that said, I've thought about how to minimize or at least hide
> the
> > API changes as much as possible. I've had two thoughts:
> >
> > 1. On the
> > SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset
> front,
> > we could replace these methods with headers that the Connect runtime
> > automatically injects into records directly before dispatching them to
> > SinkTask::put. The names can be the proposed method names (e.g.,
> > "originalTopic"). I believe this is inferior to the current proposal and
> > should be a rejected alternative, but it at least seemed worth floating
> in
> > the name of compromise. I dislike this approach for two reasons: first,
> it
> > seems even less intuitive, and second, it doesn't come with the benefit
> of
> > encouraging connector developers to understand the SMT interface and take
> > it into account when designing connectors.
> >
> > 2. Although I'd hate to see the same bookkeeping logic implemented in
> > multiple connectors, we can consider eliminating the overridden
> > SinkTask::open/close methods. A note should be added to both methods
> > clarifying that they are only invoked with the original, pre-transform
> > topic partitions, and developers will be on their own if they want to
> deal
> > with post-transform topic partitions instead. I'm on the fence with this
> > one, but if it's a choice between passing this KIP without modifying
> > SinkTask::open/close, or letting the KIP go dormant, I'd happily choose
> the
> > former.
> >
> > Thanks Yash and Greg for the discussion so far, and apologies for the
> wall
> > of text. Looking forward to your thoughts.
> >
> > Cheers,
> >
> > Chris
> >
> > [1] -
> >
> >
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)
> >
> > On Sun, Apr 23, 2023 at 11:20 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> >
> > > Hi Greg,
> > >
> > > Thanks for the response and sorry for the late reply.
> > >
> > > > Currently the AK tests have a lot of calls to, for example, new
> > > > SinkRecord(String topic, int partition, Schema keySchema,
> > > > Object key, Schema valueSchema, Object value, long kafkaOffset)
> > > > , a constructor without the original T/P/O values. I assumed that for
> > > > backwards compatibility these constructors would still be usable in
> > > > new runtimes. I imagine that there are also tests in downstream
> > projects
> > > > which make use of these constructors, whenever a Transform,
> Predicate,
> > > > or Task is tested without a corresponding Converter. My question was
> > > > about what values are chosen for the original T/P/O methods when
> these
> > > > constructors are used after an upgrade to the latest connect-api.
> > >
> > > That's a good question - since this should only primarily affect
> testing
> > I
> > > think it should be acceptable to simply use the topic, partition and
> > > kafkaOffset values as the originalTopic, originalKafkaPartition
> > > and originalKafkaOffset?
> > >
> > > > If you inject the original T/P/O only before and after the chain,
> SMTs
> > > > after an SMT which changes the original T/P/O will see whatever the
> > > earlier
> > > > SMT emitted. Is this intentional, or should this be avoided?
> > >
> > > Hmm, this sounds like a misbehaving / badly implemented SMTs since
> there
> > > doesn't seem to be any reasonable situation where an SMT should modify
> a
> > > sink record's original topic / partition / offset data so I'm not in
> > favor
> > > of introducing checks and guards in the framework for this.
> > >
> > > Another point that I've been pondering about is the one you raised
> about
> > > the composability of Connect's plugin ecosystem and the special case
> > > handling we're adding to sink connector plugins to work with certain
> > > transformation plugin types. This really doesn't seem like a good
> > precedent
> > > to be setting / starting (since there don't seem to be any other such
> > > "snowflake" inter-plugin interactions) in my opinion. The alternative
> of
> > > completely managing this in the framework (and only exposing the
> virtual
> > > coordinates to the sink tasks) doesn't seem too appealing either due to
> > the
> > > backward compatibility concerns while maintaining existing support and
> > > functionality such as the possibility of implementing exactly-once
> > > semantics, ability for tasks to rewind consumer offsets arbitrarily
> > (which
> > > might require the introduction of some form of persistence for the
> > physical
> > > <-> virtual coordinate mapping) etc. Unfortunately, even though this
> is a
> > > long standing problem that all of us want to fix, I'm considering
> moving
> > > this KIP into a dormant / inactive state since there doesn't seem to
> be a
> > > design that satisfies all the general principles that the Kafka Connect
> > > framework has striven to uphold.
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Mar 14, 2023 at 3:31 AM Greg Harris
> <greg.har...@aiven.io.invalid
> > >
> > > wrote:
> > >
> > > > Yash,
> > > >
> > > > > 'm not sure I follow - are you asking about how the tests will be
> > > updated
> > > > post this change or about how upgrades will look like for clusters in
> > > > production?
> > > >
> > > > Currently the AK tests have a lot of calls to, for example, new
> > > > SinkRecord(String topic, int partition, Schema keySchema, Object key,
> > > > Schema valueSchema, Object value, long kafkaOffset), a constructor
> > > without
> > > > the original T/P/O values. I assumed that for backwards compatibility
> > > these
> > > > constructors would still be usable in new runtimes.
> > > > I imagine that there are also tests in downstream projects which make
> > use
> > > > of these constructors, whenever a Transform, Predicate, or Task is
> > tested
> > > > without a corresponding Converter. My question was about what values
> > are
> > > > chosen for the original T/P/O methods when these constructors are
> used
> > > > after an upgrade to the latest connect-api.
> > > >
> > > > > There shouldn't be any difference in behavior here - the framework
> > will
> > > > add
> > > > the original T/P/O metadata to the record after the entire
> > transformation
> > > > chain has been applied and just before sending the record to the task
> > for
> > > > processing. The KIP doesn't propose that transformations themselves
> > > should
> > > > also be able to retrieve original T/P/O information for a sink
> record.
> > > >
> > > > The KIP includes this: "Note that while the record's offset can't be
> > > > modified via the standard SinkRecord::newRecord methods that SMTs are
> > > > expected to use, SinkRecord has public constructors that would allow
> > SMTs
> > > > to return records with modified offsets. This is why the proposed
> > changes
> > > > include a new SinkRecord::originalKafkaOffset method as well."
> > > > In order to use the new or old SinkRecord constructors outside of the
> > > > newRecord methods, SMTs will downcast the previous record and may
> > access
> > > > the original T/P/O methods. They may or may not forward this to the
> > next
> > > > SMT, and they may or may not use it in their own computation.
> > > > Since this is acknowledged as a possible implementation, I was just
> > > asking
> > > > about when one SMT changes the original T/P/O, what should later SMTs
> > and
> > > > predicates see from the original T/P/O methods?
> > > > If you inject the original T/P/O only before and after the chain,
> SMTs
> > > > after an SMT which changes the original T/P/O will see whatever the
> > > earlier
> > > > SMT emitted. Is this intentional, or should this be avoided?
> > > > For existing SMTs use the SinkRecord constructor, either directly or
> > via
> > > > subclasses of ConnectRecord, they will drop the original T/P/O and
> fall
> > > > back to the logic from question (1).
> > > >
> > > > > The rejected alternative basically says that we can't do a
> > > > deterministic mapping from virtual coordinates to physical
> coordinates
> > > > without doing a lot of book-keeping.
> > > >
> > > > I suppose there is a possible implementation of metadata book-keeping
> > > which
> > > > provides a reasonable system of virtual coordinates, it just ended up
> > > > equivalent to hydrating intermediate topics to compute a consistent
> > > record
> > > > ordering. I wasn't convinced by calling it "book-keeping" since i've
> > seen
> > > > that phrase used to disregard much less complicated state management,
> > and
> > > > had to see exactly where that solution becomes unreasonable.
> > > >
> > > > Thanks,
> > > > Greg
> > > >
> > > > On Sun, Mar 12, 2023 at 6:30 AM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Greg,
> > > > >
> > > > > Thanks for the detailed review!
> > > > >
> > > > > > What is the expected state/behavior for SinkRecords
> > > > > > which do not have original T/P/O information after the
> > > > > > upgrade? Just browsing, it appears that tests make
> > > > > > extensive use of the existing public SinkRecord
> > > > > > constructors  for both Transformations and Connectors.
> > > > >
> > > > > I'm not sure I follow - are you asking about how the tests will be
> > > > updated
> > > > > post this change or about how upgrades will look like for clusters
> in
> > > > > production? For the latter, we won't have to worry about sink
> records
> > > > > without original T/P/O information at all once a cluster is fully
> > > rolled
> > > > > and we will make it (hopefully) abundantly clear that connectors
> need
> > > to
> > > > > account for missing original T/P/O getter methods if they expect to
> > be
> > > > > deployed on older Connect runtimes.
> > > > >
> > > > > > What is the expected behavior for Transformation
> > > > > > implementations which do not use the newRecord
> > > > > > methods and instead use public SinkRecord constructors?
> > > > > > The KIP mentions this as a justification for the
> > > > > > originalKafkaOffset method, but if existing implementations
> > > > > > are using the existing constructors, those constructors won't
> > > > > > forward the original T/P/O information to later transforms or
> > > > > > the task.
> > > > >
> > > > > There shouldn't be any difference in behavior here - the framework
> > will
> > > > add
> > > > > the original T/P/O metadata to the record after the entire
> > > transformation
> > > > > chain has been applied and just before sending the record to the
> task
> > > for
> > > > > processing. The KIP doesn't propose that transformations themselves
> > > > should
> > > > > also be able to retrieve original T/P/O information for a sink
> > record.
> > > > >
> > > > > > This reasoning and the KIP design seems to imply that the
> > > > > > connector is better equipped to solve this problem than the
> > > > > > framework, but the stated reasons are not convincing for me.
> > > > >
> > > > > This was added to the KIP by the original author, but I don't think
> > the
> > > > > intention was to imply that the connector is better equipped to
> solve
> > > > this
> > > > > problem than the framework. The intention is to provide complete
> > > > > information to the connector ("physical" and "virtual coordinates"
> > > > instead
> > > > > of the currently incomplete "virtual coordinates" as you've termed
> > it)
> > > so
> > > > > that connectors can use the virtual coordinates for writing data to
> > the
> > > > > sink system and physical coordinates for offset reporting back to
> the
> > > > > framework. The rejected alternative basically says that we can't
> do a
> > > > > deterministic mapping from virtual coordinates to physical
> > coordinates
> > > > > without doing a lot of book-keeping.
> > > > >
> > > > > I agree with the rest of your analysis on the tradeoffs between the
> > > > > proposed approach versus the seemingly more attractive approach of
> > > > handling
> > > > > everything purely in the framework and only exposing "virtual
> > > > coordinates"
> > > > > to the connectors. I think the biggest thorn here is maintaining
> > > backward
> > > > > compatibility with the considerable ecosystem of existing
> connectors
> > > > which
> > > > > is something Connect has always been burdened by.
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Wed, Mar 8, 2023 at 6:54 AM Greg Harris
> > > <greg.har...@aiven.io.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Yash,
> > > > > >
> > > > > > I always use this issue as an example of a bug being caused by
> > design
> > > > > > rather than by implementation error, and once it's fixed I'll
> need
> > to
> > > > > find
> > > > > > something else to talk about :)
> > > > > > So glad to see this get fixed!
> > > > > >
> > > > > > I'll chime in to support some of the earlier discussions that
> seem
> > to
> > > > > have
> > > > > > been resolved:
> > > > > >
> > > > > > 1. With respect to SinkRecord methods vs an overloaded put(): I
> > agree
> > > > > with
> > > > > > the current design but I justify it a little bit differently than
> > has
> > > > > > already been discussed.
> > > > > > If we were designing this interface on day 1 without backwards
> > > > > > compatibility in mind, which design would make more sense? Or
> for a
> > > > > > different framing: In the future when old runtimes and connectors
> > are
> > > > > > retired and the old interfaces are removed, which design is going
> > to
> > > > look
> > > > > > more strange and unmotivated?
> > > > > > Applied to this design decision, I would say that the original
> > T/P/O
> > > > are
> > > > > > properties of a single SinkRecord and make sense as getters, and
> it
> > > > would
> > > > > > be strange to store them in an auxiliary map.
> > > > > >
> > > > > > 2. Following up this change with a compatibility library to make
> > the
> > > > > > interface easier to use is the right choice to make here. This
> > change
> > > > > > should be focused on correctness in allowing developers to fix
> the
> > > > > > incompatibility and we can be concerned with coming up with a
> more
> > > > > > ergonomic solution in the compatibility library.
> > > > > > The API should be focused on generality, correctness, and
> > performance
> > > > > > because those cannot be worked-around after the fact. Connector
> > > > > > implementations and/or libraries can be concerned with trading
> off
> > > some
> > > > > > generality and/or performance for ease-of-use.
> > > > > >
> > > > > > 3. I think that the difference in behavior of the new open/close
> > > > methods
> > > > > as
> > > > > > compared to the old methods is significant, and requires good
> > > > > documentation
> > > > > > to help connector developers avoid lazy and incorrect
> migrations. I
> > > am
> > > > > > happy to have that addressed in code review after the KIP is
> > > approved.
> > > > > >
> > > > > > I had some questions:
> > > > > >
> > > > > > 4. What is the expected state/behavior for SinkRecords which do
> not
> > > > have
> > > > > > original T/P/O information after the upgrade? Just browsing, it
> > > appears
> > > > > > that tests make extensive use of the existing public SinkRecord
> > > > > > constructors for both Transformations and Connectors.
> > > > > >
> > > > > > 5. What is the expected behavior for Transformation
> implementations
> > > > which
> > > > > > do not use the newRecord methods and instead use public
> SinkRecord
> > > > > > constructors? The KIP mentions this as a justification for the
> > > > > > originalKafkaOffset method, but if existing implementations are
> > using
> > > > the
> > > > > > existing constructors, those constructors won't forward the
> > original
> > > > > T/P/O
> > > > > > information to later transforms or the task.
> > > > > >
> > > > > > For the last few points, I want to discuss this rejected
> > alternative:
> > > > > >
> > > > > > > Address the offsets problem entirely within the framework,
> doing
> > > some
> > > > > > kind of mapping from the transformed topic back to the original
> > > topic.
> > > > > > > * This would only work in the cases where there’s no overlap
> > > between
> > > > > the
> > > > > > transformed topic names, but would break for the rest of the
> > > > > > transformations (e.g. static transformation, topic = “a”).
> > > > > > > * Even if we wanted to limit the support to those cases, it
> would
> > > > > require
> > > > > > considerable bookkeeping to add a validation to verify that the
> > > > > > transformation chain adheres to that expectation (and fail fast
> if
> > it
> > > > > > doesn’t).
> > > > > >
> > > > > > 6. This reasoning and the KIP design seems to imply that the
> > > connector
> > > > is
> > > > > > better equipped to solve this problem than the framework, but the
> > > > stated
> > > > > > reasons are not convincing for me.
> > > > > > * A static transformation still causes an offset collision in the
> > > > > connector
> > > > > > * The connector is not permitted to see the transformation chain
> to
> > > do
> > > > > any
> > > > > > fail-fast assertions
> > > > > >
> > > > > > Suppose we were to think of the records at the end of the
> > > > transformation
> > > > > > chain as being in "virtual partitions" with "virtual offsets".
> > > > > > For example, with identity-routing SMTs, the virtual coordinates
> > are
> > > > > > exactly the same as the underlying physical coordinates. For 1-1
> > > > renames,
> > > > > > each virtual topic would be the renamed topic corresponding to
> the
> > > > > > underlying topic. For fan-out from one topic to multiple virtual
> > > > topics,
> > > > > > virtual offsets would use the underlying kafka offsets with gaps
> > for
> > > > > > records going to other virtual partitions. Virtual topics with
> > > dropped
> > > > > > records have similar gaps in the offsets.
> > > > > > Currently, these virtual coordinates are passed into the
> connector
> > > via
> > > > > > SinkTask::put, but SinkTask::open/close/preCommit and
> > > > > > SinkTaskContext::assignment/offsets/pause/resume all use physical
> > > > > > coordinates.
> > > > > > This proposal patches put,open, and close to have both physical
> and
> > > > > virtual
> > > > > > coordinates, but leaves the other methods with physical
> > coordinates.
> > > > > After
> > > > > > this proposal, connectors would be intentionally made aware of
> the
> > > > > > distinction between physical and virtual coordinates, and manage
> > > their
> > > > > own
> > > > > > bookkeeping for the two systems.
> > > > > >
> > > > > > To avoid that connector logic, we could use virtual coordinates
> in
> > > all
> > > > > > connector calls, never revealing that they are different from the
> > > > > physical
> > > > > > coordinates. There's a whole design shopping list that we'd need:
> > > > > > * Renumbering mechanism for disambiguating and making virtual
> > offsets
> > > > > > monotonic in the case of topic/partition collisions
> > > > > > * Data structure and strategy for translating virtual offsets
> back
> > to
> > > > > > physical offsets
> > > > > > * New limits on SinkTaskContext::offsets() calls to prevent
> > rewinding
> > > > > > before the latest commit
> > > > > > * Backwards compatibility and upgrade design
> > > > > >
> > > > > > 7. This alternative was very appealing to me, because the
> strength
> > > of a
> > > > > > plugin framework is the composability of different components.
> > Among
> > > a
> > > > > > collection of N connectors and M transforms, it should ideally
> only
> > > > take
> > > > > > N + M work to understand how the components combine to build the
> > > whole.
> > > > > > However, once you start adding special cases to some plugins to
> > > support
> > > > > > interactions with others, the whole system can take N * M work to
> > > > > > understand. From a complexity standpoint, it would be very good
> for
> > > the
> > > > > > framework to solve this in a way which was connector-agnostic.
> > > > > > The current design compromises the logical isolation of the
> plugins
> > > > > > slightly, but they can collapse offsets very memory-efficiently,
> > and
> > > > > re-use
> > > > > > the existing raw coordinate functions and keep everything else
> > > > backwards
> > > > > > compatible. After deriving all of the above, I think that's a
> > > > reasonable
> > > > > > tradeoff to make.
> > > > > >
> > > > > > Thanks,
> > > > > > Greg
> > > > > >
> > > > > > On Tue, Feb 21, 2023 at 10:17 AM Chris Egerton
> > > <chr...@aiven.io.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yash,
> > > > > > >
> > > > > > > We'll probably want to make a few tweaks to the Javadocs for
> the
> > > new
> > > > > > > methods (I'm imagining that notes on compatibility with older
> > > > versions
> > > > > > will
> > > > > > > be required), but I believe what's proposed in the KIP is good
> > > enough
> > > > > to
> > > > > > > approve with the understanding that it may not exactly match
> what
> > > > gets
> > > > > > > implemented/merged.
> > > > > > >
> > > > > > > LGTM, thanks again for the KIP!
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Chris
> > > > > > >
> > > > > > > On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya <
> > yash.ma...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Chris,
> > > > > > > >
> > > > > > > > > we might try to introduce a framework-level configuration
> > > > > > > > > property to dictate which of the pre-transform and
> > > post-transform
> > > > > > > > > topic partitions are used for the fallback call to the
> > > single-arg
> > > > > > > > > variant if a task class has not overridden the multi-arg
> > > variant
> > > > > > > >
> > > > > > > > Thanks for the explanation and I agree that this will be a
> tad
> > > bit
> > > > > too
> > > > > > > > convoluted. :)
> > > > > > > >
> > > > > > > > Please do let me know if you'd like any further amendments to
> > the
> > > > > KIP!
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yash
> > > > > > > >
> > > > > > > > On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton
> > > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Yash,
> > > > > > > > >
> > > > > > > > > I think the use case for pre-transform TPO coordinates (and
> > > topic
> > > > > > > > partition
> > > > > > > > > writers created/destroyed in close/open) tends to boil down
> > to
> > > > > > > > exactly-once
> > > > > > > > > semantics, where it's desirable to preserve the guarantees
> > that
> > > > > Kafka
> > > > > > > > > provides (every record has a unique TPO trio, and records
> are
> > > > > ordered
> > > > > > > by
> > > > > > > > > offset within a topic partition).
> > > > > > > > >
> > > > > > > > > It's my understanding that this approach is utilized in
> > several
> > > > > > > > connectors
> > > > > > > > > out there today, and it might break these connectors to
> start
> > > > using
> > > > > > the
> > > > > > > > > post-transform topic partitions automatically in their
> > > open/close
> > > > > > > > methods.
> > > > > > > > >
> > > > > > > > > If we want to get really fancy with this and try to obviate
> > or
> > > at
> > > > > > least
> > > > > > > > > reduce the need for per-connector code changes, we might
> try
> > to
> > > > > > > > introduce a
> > > > > > > > > framework-level configuration property to dictate which of
> > the
> > > > > > > > > pre-transform and post-transform topic partitions are used
> > for
> > > > the
> > > > > > > > fallback
> > > > > > > > > call to the single-arg variant if a task class has not
> > > overridden
> > > > > the
> > > > > > > > > multi-arg variant. But I think this is going a bit too far
> > and
> > > > > would
> > > > > > > > prefer
> > > > > > > > > to keep things simple(r) for now.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <
> > > yash.ma...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Chris,
> > > > > > > > > >
> > > > > > > > > > > I was actually envisioning something like `void
> > > > > > > > > > > open(Collection<TopicPartition> originalPartitions,
> > > > > > > > > > > Collection<TopicPartition> transformedPartitions)`
> > > > > > > > > >
> > > > > > > > > > Ah okay, this does make a lot more sense. Sorry, I think
> I
> > > > > > > > misunderstood
> > > > > > > > > > you earlier. I do agree with you that this seems better
> > than
> > > > > > > splitting
> > > > > > > > it
> > > > > > > > > > off into two new sets of open / close methods from a
> > > complexity
> > > > > > > > > standpoint.
> > > > > > > > > >
> > > > > > > > > > > Plus, if a connector is intentionally designed to use
> > > > > > > > > > > pre-transformation topic partitions in its open/close
> > > > > > > > > > > methods, wouldn't we just be trading one form of the
> > > > > > > > > > >  problem for another by making this switch?
> > > > > > > > > >
> > > > > > > > > > On thinking about this a bit more, I'm not so convinced
> > that
> > > we
> > > > > > need
> > > > > > > to
> > > > > > > > > > expose the pre-transform / original topic partitions in
> the
> > > new
> > > > > > open
> > > > > > > /
> > > > > > > > > > close methods. The purpose of the open / close methods is
> > to
> > > > > allow
> > > > > > > sink
> > > > > > > > > > tasks to allocate and deallocate resources for each topic
> > > > > partition
> > > > > > > > > > assigned to the task and the purpose of topic-mutating
> SMTs
> > > is
> > > > to
> > > > > > > > > > essentially modify the source topic name from the point
> of
> > > view
> > > > > of
> > > > > > > the
> > > > > > > > > sink
> > > > > > > > > > connector. Why would a sink connector ever need to or
> want
> > to
> > > > > > > allocate
> > > > > > > > > > resources for pre-transform topic partitions? Is the
> > argument
> > > > > here
> > > > > > > that
> > > > > > > > > > since we'll be exposing both the pre-transform and
> > > > post-transform
> > > > > > > topic
> > > > > > > > > > partitions per record, we should also expose the same
> info
> > > via
> > > > > > open /
> > > > > > > > > close
> > > > > > > > > > and allow sink connector implementations to disregard
> > > > > > topic-mutating
> > > > > > > > SMTs
> > > > > > > > > > completely if they wanted to?
> > > > > > > > > >
> > > > > > > > > > Either way, I've gone ahead and updated the KIP to
> reflect
> > > all
> > > > of
> > > > > > > > > > our previous discussion here since it had become quite
> > > > outdated.
> > > > > > I've
> > > > > > > > > also
> > > > > > > > > > updated the KIP title from "Sink Connectors: Support
> > > > > topic-mutating
> > > > > > > > SMTs
> > > > > > > > > > for async connectors (preCommit users)" to "Allow sink
> > > > connectors
> > > > > > to
> > > > > > > be
> > > > > > > > > > used with topic-mutating SMTs" since the improvements to
> > the
> > > > > open /
> > > > > > > > close
> > > > > > > > > > mechanism doesn't pertain only to asynchronous sink
> > > connectors.
> > > > > The
> > > > > > > new
> > > > > > > > > KIP
> > > > > > > > > > URL is:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yash
> > > > > > > > > >
> > > > > > > > > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Yash,
> > > > > > > > > > >
> > > > > > > > > > > I was actually envisioning something like `void
> > > > > > > > > > > open(Collection<TopicPartition>
> > > > > > > > > > > originalPartitions, Collection<TopicPartition>
> > > > > > > > transformedPartitions)`,
> > > > > > > > > > > since we already convert and transform each batch of
> > > records
> > > > > that
> > > > > > > we
> > > > > > > > > poll
> > > > > > > > > > > from the sink task's consumer en masse, meaning we
> could
> > > > > discover
> > > > > > > > > several
> > > > > > > > > > > new transformed partitions in between consecutive calls
> > to
> > > > > > > > > SinkTask::put.
> > > > > > > > > > >
> > > > > > > > > > > It's also worth noting that we'll probably want to
> > > deprecate
> > > > > the
> > > > > > > > > existing
> > > > > > > > > > > open/close methods, at which point keeping one
> > > non-deprecated
> > > > > > > variant
> > > > > > > > > of
> > > > > > > > > > > each seems more appealing and less complex than keeping
> > > two.
> > > > > > > > > > >
> > > > > > > > > > > Honestly though, I think we're both on the same page
> > enough
> > > > > that
> > > > > > I
> > > > > > > > > > wouldn't
> > > > > > > > > > > object to either approach. We've probably reached the
> > > > > saturation
> > > > > > > > point
> > > > > > > > > > for
> > > > > > > > > > > ROI here and as long as we provide developers a way to
> > get
> > > > the
> > > > > > > > > > information
> > > > > > > > > > > they need from the runtime and take care to add
> Javadocs
> > > and
> > > > > > update
> > > > > > > > our
> > > > > > > > > > > docs page (possibly including the connector development
> > > > > > > quickstart),
> > > > > > > > it
> > > > > > > > > > > should be fine.
> > > > > > > > > > >
> > > > > > > > > > > At this point, it might be worth updating the KIP based
> > on
> > > > > recent
> > > > > > > > > > > discussion so that others can see the latest proposal,
> > and
> > > we
> > > > > can
> > > > > > > > both
> > > > > > > > > > take
> > > > > > > > > > > a look and make sure everything looks good enough
> before
> > > > > opening
> > > > > > a
> > > > > > > > vote
> > > > > > > > > > > thread.
> > > > > > > > > > >
> > > > > > > > > > > Finally, I think you make a convincing case for a
> > > time-based
> > > > > > > eviction
> > > > > > > > > > > policy. I wasn't thinking about the fairly common SMT
> > > pattern
> > > > > of
> > > > > > > > > > deriving a
> > > > > > > > > > > topic name from, e.g., a record field or header.
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > >
> > > > > > > > > > > Chris
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <
> > > > > > yash.ma...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > >
> > > > > > > > > > > > > Plus, if a connector is intentionally designed to
> > > > > > > > > > > > > use pre-transformation topic partitions in its
> > > > > > > > > > > > > open/close methods, wouldn't we just be trading
> > > > > > > > > > > > > one form of the problem for another by making this
> > > > > > > > > > > > > switch?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks, this makes sense, and given that the KIP
> > already
> > > > > > > proposes a
> > > > > > > > > way
> > > > > > > > > > > for
> > > > > > > > > > > > sink connector implementations to distinguish between
> > > > > > > pre-transform
> > > > > > > > > and
> > > > > > > > > > > > post-transform topics per record, I think I'm
> convinced
> > > > that
> > > > > > > going
> > > > > > > > > with
> > > > > > > > > > > new
> > > > > > > > > > > > `open()` / `close()` methods is the right approach.
> > > > However,
> > > > > I
> > > > > > > > still
> > > > > > > > > > feel
> > > > > > > > > > > > like having overloaded methods will make it a lot
> less
> > > > > > > unintuitive
> > > > > > > > > > given
> > > > > > > > > > > > that the two sets of methods would be different in
> > terms
> > > of
> > > > > > when
> > > > > > > > > > they're
> > > > > > > > > > > > called and what arguments they are passed (also I'm
> > > > presuming
> > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > overloaded methods you're prescribing will only have
> a
> > > > single
> > > > > > > > > > > > `TopicPartition` rather than a
> > > `Collection<TopicPartition>`
> > > > > as
> > > > > > > > their
> > > > > > > > > > > > parameters). I guess my concern is largely around the
> > > fact
> > > > > that
> > > > > > > it
> > > > > > > > > > won't
> > > > > > > > > > > be
> > > > > > > > > > > > possible to distinguish between the overloaded
> methods'
> > > use
> > > > > > cases
> > > > > > > > > just
> > > > > > > > > > > from
> > > > > > > > > > > > the method signatures. I agree that naming is going
> to
> > be
> > > > > > > difficult
> > > > > > > > > > here,
> > > > > > > > > > > > but I think that having two sets of
> > `SinkTask::openXyz` /
> > > > > > > > > > > > `SinkTask::closeXyz` methods will be less complicated
> > to
> > > > > > > understand
> > > > > > > > > > from
> > > > > > > > > > > a
> > > > > > > > > > > > connector developer perspective (as compared to
> > > overloaded
> > > > > > > methods
> > > > > > > > > with
> > > > > > > > > > > > only differing documentation). Of your suggested
> > > options, I
> > > > > > think
> > > > > > > > > > > > `openPreTransform` / `openPostTransform` are the most
> > > > > > > > comprehensible
> > > > > > > > > > > ones.
> > > > > > > > > > > >
> > > > > > > > > > > > > BTW, I wouldn't say that we can't make assumptions
> > > > > > > > > > > > > about the relationships between pre- and
> > > > > post-transformation
> > > > > > > > > > > > >  topic partitions.
> > > > > > > > > > > >
> > > > > > > > > > > > I meant that the framework wouldn't be able to
> > > > > > deterministically
> > > > > > > > know
> > > > > > > > > > > when
> > > > > > > > > > > > to close a post-transform topic partition given that
> > SMTs
> > > > > could
> > > > > > > use
> > > > > > > > > > > > per-record data / metadata to manipulate the topic
> > names
> > > as
> > > > > and
> > > > > > > how
> > > > > > > > > > > > required (which supports the suggestion to use an
> > > eviction
> > > > > > policy
> > > > > > > > > based
> > > > > > > > > > > > mechanism to call SinkTask::close for post-transform
> > > topic
> > > > > > > > > partitions).
> > > > > > > > > > > >
> > > > > > > > > > > > > We might utilize a policy that assumes a
> > deterministic
> > > > > > > > > > > > > mapping from the former to the latter, for example.
> > > > > > > > > > > >
> > > > > > > > > > > > Wouldn't this be making the assumption that SMTs only
> > use
> > > > the
> > > > > > > topic
> > > > > > > > > > name
> > > > > > > > > > > > itself and no other data / metadata while computing
> the
> > > new
> > > > > > topic
> > > > > > > > > name?
> > > > > > > > > > > Are
> > > > > > > > > > > > you suggesting that since this assumption could work
> > for
> > > a
> > > > > > > majority
> > > > > > > > > of
> > > > > > > > > > > > SMTs, it might be more efficient overall in terms of
> > > > reducing
> > > > > > the
> > > > > > > > > > number
> > > > > > > > > > > of
> > > > > > > > > > > > "false-positive" calls to
> > `SinkTask::closePostTransform`
> > > > (and
> > > > > > > we'll
> > > > > > > > > > also
> > > > > > > > > > > be
> > > > > > > > > > > > able to call `SinkTask::closePostTransform`
> immediately
> > > > after
> > > > > > > topic
> > > > > > > > > > > > partitions are revoked from the consumer)? I was
> > thinking
> > > > > > > something
> > > > > > > > > > more
> > > > > > > > > > > > generic along the lines of a simple time based
> eviction
> > > > > policy
> > > > > > > that
> > > > > > > > > > > > wouldn't be making any assumptions regarding the SMT
> > > > > > > > implementations.
> > > > > > > > > > > > Either way, I do like your earlier suggestion of
> > keeping
> > > > this
> > > > > > > logic
> > > > > > > > > > > > internal and not painting ourselves into a corner by
> > > > > promising
> > > > > > > any
> > > > > > > > > > > > particular behavior in the KIP.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yash
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think the key difference between adding
> > > > methods/overloads
> > > > > > > > related
> > > > > > > > > > to
> > > > > > > > > > > > > SinkTask::open/SinkTask::close and SinkTask::put is
> > > that
> > > > > this
> > > > > > > > isn't
> > > > > > > > > > > > > auxiliary information that may or may not be useful
> > to
> > > > > > > connector
> > > > > > > > > > > > > developers. It's actually critical for them to
> > > understand
> > > > > the
> > > > > > > > > > > difference
> > > > > > > > > > > > > between the two concepts here, even if they look
> very
> > > > > > similar.
> > > > > > > > And
> > > > > > > > > > > yes, I
> > > > > > > > > > > > > do believe that switching from pre-transform to
> > > > > > post-transform
> > > > > > > > > topic
> > > > > > > > > > > > > partitions is too big a change in behavior here.
> > Plus,
> > > > if a
> > > > > > > > > connector
> > > > > > > > > > > is
> > > > > > > > > > > > > intentionally designed to use pre-transformation
> > topic
> > > > > > > partitions
> > > > > > > > > in
> > > > > > > > > > > its
> > > > > > > > > > > > > open/close methods, wouldn't we just be trading one
> > > form
> > > > of
> > > > > > the
> > > > > > > > > > problem
> > > > > > > > > > > > for
> > > > > > > > > > > > > another by making this switch?
> > > > > > > > > > > > >
> > > > > > > > > > > > > One possible alternative to overloading the
> existing
> > > > > methods
> > > > > > is
> > > > > > > > to
> > > > > > > > > > > split
> > > > > > > > > > > > > SinkTask::open into openOriginal (or possibly
> > > > openPhysical
> > > > > or
> > > > > > > > > > > > > openPreTransform) and openTransformed (or
> openLogical
> > > or
> > > > > > > > > > > > > openPostTransform), with a similar change for
> > > > > > SinkTask::close.
> > > > > > > > The
> > > > > > > > > > > > default
> > > > > > > > > > > > > implementation for SinkTask::openOriginal can be to
> > > call
> > > > > > > > > > > SinkTask::open,
> > > > > > > > > > > > > and the same can go for SinkTask::close. However, I
> > > > prefer
> > > > > > > > > > overloading
> > > > > > > > > > > > the
> > > > > > > > > > > > > existing methods since this alternative increases
> > > > > complexity
> > > > > > > and
> > > > > > > > > none
> > > > > > > > > > > of
> > > > > > > > > > > > > the names are very informative.
> > > > > > > > > > > > >
> > > > > > > > > > > > > BTW, I wouldn't say that we can't make assumptions
> > > about
> > > > > the
> > > > > > > > > > > > relationships
> > > > > > > > > > > > > between pre- and post-transformation topic
> > partitions.
> > > We
> > > > > > might
> > > > > > > > > > > utilize a
> > > > > > > > > > > > > policy that assumes a deterministic mapping from
> the
> > > > former
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > > latter,
> > > > > > > > > > > > > for example. The distinction I'd draw is that the
> > > > > assumptions
> > > > > > > we
> > > > > > > > > make
> > > > > > > > > > > can
> > > > > > > > > > > > > and probably should favor some cases in terms of
> > > > > performance
> > > > > > > > (i.e.,
> > > > > > > > > > > > > reducing the number of unnecessary calls to
> > close/open
> > > > > over a
> > > > > > > > given
> > > > > > > > > > > sink
> > > > > > > > > > > > > task's lifetime), but should not lead to guaranteed
> > > > > resource
> > > > > > > > leaks
> > > > > > > > > or
> > > > > > > > > > > > > failure to obey API contract in any cases.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Chris
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <
> > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > especially if connectors are intentionally
> > designed
> > > > > > around
> > > > > > > > > > > > > > > original topic partitions instead of
> transformed
> > > > ones.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Ha, that's a good point and reminds me of Hyrum's
> > Law
> > > > [1]
> > > > > > :)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think we have to provide connector developers
> > > with
> > > > > some
> > > > > > > > > > > > > > > way to differentiate between the two, but maybe
> > > > > there's a
> > > > > > > way
> > > > > > > > > > > > > > >  to do this that I haven't thought of yet
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I can't think of a better way to do this either;
> > > would
> > > > > > > invoking
> > > > > > > > > the
> > > > > > > > > > > > > > existing `SinkTask::open` and `SinkTask::close`
> > > methods
> > > > > > with
> > > > > > > > > > > > > post-transform
> > > > > > > > > > > > > > topic partitions instead of pre-transform topic
> > > > > partitions
> > > > > > > not
> > > > > > > > be
> > > > > > > > > > > > > > acceptable even in a minor / major AK release? I
> > feel
> > > > > like
> > > > > > > the
> > > > > > > > > > > proposed
> > > > > > > > > > > > > > approach of adding overloaded `SinkTask::open` /
> > > > > > > > > `SinkTask::close`
> > > > > > > > > > > > > methods
> > > > > > > > > > > > > > to differentiate between pre-transform and
> > > > post-transform
> > > > > > > topic
> > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > has similar pitfalls to the idea of the
> overloaded
> > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > method
> > > > > > > > > > > > > > we discarded earlier.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Either way, I'm glad that the general idea of a
> > > cache
> > > > > and
> > > > > > > > > > > > > > > eviction policy for SinkTask::close seem
> > > reasonable;
> > > > if
> > > > > > > > > > > > > > > we decide to go this route, it might make sense
> > for
> > > > the
> > > > > > KIP
> > > > > > > > > > > > > > > to include an outline of one or more high-level
> > > > > > strategies
> > > > > > > > > > > > > > > we might take, but without promising any
> > particular
> > > > > > > behavior
> > > > > > > > > > > > > > > beyond occasionally calling SinkTask::close for
> > > > > > > > post-transform
> > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic
> can
> > > stay
> > > > > > > > internal,
> > > > > > > > > > > > > > > and by notpainting ourselves into a corner with
> > the
> > > > > KIP,
> > > > > > we
> > > > > > > > > > > > > > > give ourselves leeway to tweak it in the future
> > if
> > > > > > > necessary
> > > > > > > > > > > > > > > without filing another KIP or introducing a
> > > pluggable
> > > > > > > > > interface.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks, that's a good idea. Given the flexibility
> > of
> > > > > SMTs,
> > > > > > > the
> > > > > > > > > > > > framework
> > > > > > > > > > > > > > can't really make any assumptions around topic
> > > > partitions
> > > > > > > post
> > > > > > > > > > > > > > transformation nor does it have any way to
> > > definitively
> > > > > get
> > > > > > > any
> > > > > > > > > > such
> > > > > > > > > > > > > > information from transformations which is why the
> > > idea
> > > > > of a
> > > > > > > > cache
> > > > > > > > > > > with
> > > > > > > > > > > > an
> > > > > > > > > > > > > > eviction policy makes perfect sense!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] - https://www.hyrumslaw.com/
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton
> > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > So it looks like with the current state of
> > > affairs,
> > > > > > sink
> > > > > > > > > tasks
> > > > > > > > > > > that
> > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > instantiate writers in the SinkTask::open
> method
> > > (and
> > > > > > don't
> > > > > > > > do
> > > > > > > > > > the
> > > > > > > > > > > > lazy
> > > > > > > > > > > > > > > instantiation in SinkTask::put that you
> > mentioned)
> > > > > might
> > > > > > > fail
> > > > > > > > > > when
> > > > > > > > > > > > used
> > > > > > > > > > > > > > > with topic/partition mutating SMTs even if they
> > > don't
> > > > > do
> > > > > > > any
> > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > > processing?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yep, exactly 👍
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > What do you think about retaining just the
> > > existing
> > > > > > > methods
> > > > > > > > > > > > > > > but changing when they're called in the Connect
> > > > > runtime?
> > > > > > > For
> > > > > > > > > > > > instance,
> > > > > > > > > > > > > > > instead of calling SinkTask::open after
> partition
> > > > > > > assignment
> > > > > > > > > > post a
> > > > > > > > > > > > > > > consumer group rebalance, we could cache the
> > > > currently
> > > > > > > "seen"
> > > > > > > > > > topic
> > > > > > > > > > > > > > > partitions (post transformation) and before
> each
> > > call
> > > > > to
> > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > > check whether there's any new "unseen" topic
> > > > > partitions,
> > > > > > > and
> > > > > > > > if
> > > > > > > > > > so
> > > > > > > > > > > > call
> > > > > > > > > > > > > > > SinkTask::open (and also update the cache of
> > > course).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > IMO the issue here is that it's a drastic
> change
> > in
> > > > > > > behavior
> > > > > > > > to
> > > > > > > > > > > start
> > > > > > > > > > > > > > > invoking SinkTask::open and SinkTask::close
> with
> > > > > > > > post-transform
> > > > > > > > > > > topic
> > > > > > > > > > > > > > > partitions instead of pre-transform, especially
> > if
> > > > > > > connectors
> > > > > > > > > are
> > > > > > > > > > > > > > > intentionally designed around original topic
> > > > partitions
> > > > > > > > instead
> > > > > > > > > > of
> > > > > > > > > > > > > > > transformed ones. I think we have to provide
> > > > connector
> > > > > > > > > developers
> > > > > > > > > > > > with
> > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > way to differentiate between the two, but maybe
> > > > > there's a
> > > > > > > way
> > > > > > > > > to
> > > > > > > > > > do
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > that I haven't thought of yet. Interested to
> hear
> > > > your
> > > > > > > > > thoughts.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Either way, I'm glad that the general idea of a
> > > cache
> > > > > and
> > > > > > > > > > eviction
> > > > > > > > > > > > > policy
> > > > > > > > > > > > > > > for SinkTask::close seem reasonable; if we
> decide
> > > to
> > > > go
> > > > > > > this
> > > > > > > > > > route,
> > > > > > > > > > > > it
> > > > > > > > > > > > > > > might make sense for the KIP to include an
> > outline
> > > of
> > > > > one
> > > > > > > or
> > > > > > > > > more
> > > > > > > > > > > > > > > high-level strategies we might take, but
> without
> > > > > > promising
> > > > > > > > any
> > > > > > > > > > > > > particular
> > > > > > > > > > > > > > > behavior beyond occasionally calling
> > > SinkTask::close
> > > > > for
> > > > > > > > > > > > post-transform
> > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic
> can
> > > stay
> > > > > > > > internal,
> > > > > > > > > > and
> > > > > > > > > > > > by
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > painting ourselves into a corner with the KIP,
> we
> > > > give
> > > > > > > > > ourselves
> > > > > > > > > > > > leeway
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > tweak it in the future if necessary without
> > filing
> > > > > > another
> > > > > > > > KIP
> > > > > > > > > or
> > > > > > > > > > > > > > > introducing a pluggable interface.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1) That's a fair point; while I did scan
> > > everything
> > > > > > > > publicly
> > > > > > > > > > > > > available
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > GitHub, you're right in that it won't cover
> all
> > > > > > possible
> > > > > > > > SMTs
> > > > > > > > > > > that
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > out
> > > > > > > > > > > > > > > > there. Thanks for the example use-case as
> well,
> > > > I've
> > > > > > > > updated
> > > > > > > > > > the
> > > > > > > > > > > > KIP
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > add
> > > > > > > > > > > > > > > > the two new proposed methods.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2) So it looks like with the current state of
> > > > > affairs,
> > > > > > > sink
> > > > > > > > > > tasks
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > instantiate writers in the SinkTask::open
> > method
> > > > (and
> > > > > > > don't
> > > > > > > > > do
> > > > > > > > > > > the
> > > > > > > > > > > > > lazy
> > > > > > > > > > > > > > > > instantiation in SinkTask::put that you
> > > mentioned)
> > > > > > might
> > > > > > > > fail
> > > > > > > > > > > when
> > > > > > > > > > > > > used
> > > > > > > > > > > > > > > > with topic/partition mutating SMTs even if
> they
> > > > don't
> > > > > > do
> > > > > > > > any
> > > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > > > processing? Since they could encounter
> records
> > in
> > > > > > > > > SinkTask::put
> > > > > > > > > > > > with
> > > > > > > > > > > > > > > > topics/partitions that they might not have
> > > created
> > > > > > > writers
> > > > > > > > > for.
> > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > pointing this out, it's definitely another
> > > > > > > incompatibility
> > > > > > > > > that
> > > > > > > > > > > > needs
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > called out and fixed. The overloaded method
> > > > approach
> > > > > is
> > > > > > > > > > > > interesting,
> > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > comes with the caveat of yet more new methods
> > > that
> > > > > will
> > > > > > > > need
> > > > > > > > > to
> > > > > > > > > > > be
> > > > > > > > > > > > > > > > implemented by existing connectors if they
> want
> > > to
> > > > > make
> > > > > > > use
> > > > > > > > > of
> > > > > > > > > > > this
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > functionality. What do you think about
> > retaining
> > > > just
> > > > > > the
> > > > > > > > > > > existing
> > > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > but changing when they're called in the
> Connect
> > > > > > runtime?
> > > > > > > > For
> > > > > > > > > > > > > instance,
> > > > > > > > > > > > > > > > instead of calling SinkTask::open after
> > partition
> > > > > > > > assignment
> > > > > > > > > > > post a
> > > > > > > > > > > > > > > > consumer group rebalance, we could cache the
> > > > > currently
> > > > > > > > "seen"
> > > > > > > > > > > topic
> > > > > > > > > > > > > > > > partitions (post transformation) and before
> > each
> > > > call
> > > > > > to
> > > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > > > check whether there's any new "unseen" topic
> > > > > > partitions,
> > > > > > > > and
> > > > > > > > > if
> > > > > > > > > > > so
> > > > > > > > > > > > > call
> > > > > > > > > > > > > > > > SinkTask::open (and also update the cache of
> > > > > course). I
> > > > > > > > don't
> > > > > > > > > > > think
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > would break the existing contract with sink
> > tasks
> > > > > where
> > > > > > > > > > > > > SinkTask::open
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > expected to be called for a topic partition
> > > before
> > > > > any
> > > > > > > > > records
> > > > > > > > > > > from
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > topic partition are sent via SinkTask::put?
> The
> > > > > > > > > SinkTask::close
> > > > > > > > > > > > case
> > > > > > > > > > > > > > is a
> > > > > > > > > > > > > > > > lot trickier however, and would require some
> > sort
> > > > of
> > > > > > > cache
> > > > > > > > > > > eviction
> > > > > > > > > > > > > > > policy
> > > > > > > > > > > > > > > > that would be deemed appropriate as you
> pointed
> > > out
> > > > > > too.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton
> > > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I've had some time to think on this KIP
> and I
> > > > think
> > > > > > I'm
> > > > > > > > in
> > > > > > > > > > > > > agreement
> > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > not blocking it on an official
> compatibility
> > > > > library
> > > > > > or
> > > > > > > > > > adding
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > "ack"
> > > > > > > > > > > > > > > > > API for sink records.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I only have two more thoughts:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1. Because it is possible to manipulate
> sink
> > > > record
> > > > > > > > > > partitions
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > > > with the current API we provide for
> > > > > transformations,
> > > > > > I
> > > > > > > > > still
> > > > > > > > > > > > > believe
> > > > > > > > > > > > > > > > > methods should be added to the SinkRecord
> > class
> > > > to
> > > > > > > expose
> > > > > > > > > the
> > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > partition and offset, not just the original
> > > > topic.
> > > > > > The
> > > > > > > > > > > additional
> > > > > > > > > > > > > > > > cognitive
> > > > > > > > > > > > > > > > > burden from these two methods is going to
> be
> > > > > minimal
> > > > > > > > > anyways;
> > > > > > > > > > > > once
> > > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > > understand the difference between the
> > > transformed
> > > > > > topic
> > > > > > > > > name
> > > > > > > > > > > and
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > original one, it's going to be trivial for
> > them
> > > > to
> > > > > > > > > understand
> > > > > > > > > > > how
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > same
> > > > > > > > > > > > > > > > > difference applies for partitions and
> > offsets.
> > > > It's
> > > > > > not
> > > > > > > > > > enough
> > > > > > > > > > > to
> > > > > > > > > > > > > > scan
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > set of SMTs provided out of the box with
> > > Connect,
> > > > > > ones
> > > > > > > > > > > developed
> > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > Confluent, or even everything available on
> > > > GitHub,
> > > > > > > since
> > > > > > > > > > there
> > > > > > > > > > > > may
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > closed-source projects out there that rely
> on
> > > > this
> > > > > > > > ability.
> > > > > > > > > > One
> > > > > > > > > > > > > > > potential
> > > > > > > > > > > > > > > > > use case could be re-routing partitions
> > between
> > > > > Kafka
> > > > > > > and
> > > > > > > > > > some
> > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > sharded system.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 2. We still have to address the
> > SinkTask::open
> > > > [1]
> > > > > > and
> > > > > > > > > > > > > > SinkTask::close
> > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > methods. If a connector writes to the
> > external
> > > > > system
> > > > > > > > using
> > > > > > > > > > the
> > > > > > > > > > > > > > > > transformed
> > > > > > > > > > > > > > > > > topic partitions it reads from Kafka, then
> > it's
> > > > > > > possible
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > to lazily instantiate writers for topic
> > > > partitions
> > > > > as
> > > > > > > it
> > > > > > > > > > > > encounters
> > > > > > > > > > > > > > > them
> > > > > > > > > > > > > > > > > from records provided in SinkTask::put.
> > > However,
> > > > > > > > connectors
> > > > > > > > > > > also
> > > > > > > > > > > > > > need a
> > > > > > > > > > > > > > > > way
> > > > > > > > > > > > > > > > > to de-allocate those writers (and the
> > resources
> > > > > used
> > > > > > by
> > > > > > > > > them)
> > > > > > > > > > > > over
> > > > > > > > > > > > > > > time,
> > > > > > > > > > > > > > > > > which they can't do as easily. One possible
> > > > > approach
> > > > > > > here
> > > > > > > > > is
> > > > > > > > > > to
> > > > > > > > > > > > > > > overload
> > > > > > > > > > > > > > > > > SinkTask::open and SinkTask::close with
> > > variants
> > > > > that
> > > > > > > > > > > distinguish
> > > > > > > > > > > > > > > between
> > > > > > > > > > > > > > > > > transformed and original topic partitions,
> > and
> > > > > > default
> > > > > > > to
> > > > > > > > > > > > invoking
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > existing methods with just the original
> topic
> > > > > > > partitions.
> > > > > > > > > We
> > > > > > > > > > > > would
> > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > have several options for how the Connect
> > > runtime
> > > > > can
> > > > > > > > invoke
> > > > > > > > > > > these
> > > > > > > > > > > > > > > > methods,
> > > > > > > > > > > > > > > > > but in general, an approach that guarantees
> > > that
> > > > > > tasks
> > > > > > > > are
> > > > > > > > > > > > notified
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > transformed topic partitions in
> > SinkTask::open
> > > > > before
> > > > > > > any
> > > > > > > > > > > records
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > partition are given to it in SinkTask::put,
> > and
> > > > > > makes a
> > > > > > > > > > > > best-effort
> > > > > > > > > > > > > > > > attempt
> > > > > > > > > > > > > > > > > to close transformed topic partitions that
> > > appear
> > > > > to
> > > > > > no
> > > > > > > > > > longer
> > > > > > > > > > > be
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > based on some eviction policy, would
> probably
> > > be
> > > > > > > > > sufficient.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > [1] -
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > > > > > > > > > > > > > > > > [2] -
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <
> > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks a lot for your inputs!
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > would provide a simple, clean interface
> > for
> > > > > > > > developers
> > > > > > > > > to
> > > > > > > > > > > > > > determine
> > > > > > > > > > > > > > > > > > > which features are supported by the
> > version
> > > > of
> > > > > > the
> > > > > > > > > > Connect
> > > > > > > > > > > > > > runtime
> > > > > > > > > > > > > > > > > > > that their plugin has been deployed
> onto
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I do like the idea of having such a
> public
> > > > > > > > compatibility
> > > > > > > > > > > > library
> > > > > > > > > > > > > -
> > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > it would remove a lot of restrictions
> from
> > > > > > framework
> > > > > > > > > > > > development
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > were
> > > > > > > > > > > > > > > > > > to be widely adopted.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > we might consider adding an API to
> "ack"
> > > sink
> > > > > > > records
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I agree that this does seem like a more
> > > > intuitive
> > > > > > and
> > > > > > > > > clean
> > > > > > > > > > > > API,
> > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > > concerned about the backward
> compatibility
> > > > > headache
> > > > > > > > we'd
> > > > > > > > > be
> > > > > > > > > > > > > > imposing
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > existing sink connectors. Connector
> > > developers
> > > > > will
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > > > > > maintain
> > > > > > > > > > > > > > > > two
> > > > > > > > > > > > > > > > > > separate ways of doing offset management
> if
> > > > they
> > > > > > want
> > > > > > > > to
> > > > > > > > > > use
> > > > > > > > > > > > the
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > but continue supporting older versions of
> > > Kafka
> > > > > > > > Connect.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > For now, I've reverted the KIP to the
> > > previous
> > > > > > > > iteration
> > > > > > > > > > > which
> > > > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > addition of a new `SinkRecord` method to
> > > obtain
> > > > > the
> > > > > > > > > > original
> > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > pre-transformation. One thing to note is
> > that
> > > > > I've
> > > > > > > > > removed
> > > > > > > > > > > the
> > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > obtaining the original Kafka partition
> > after
> > > a
> > > > > > > cursory
> > > > > > > > > > search
> > > > > > > > > > > > > > showed
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > use cases for partition modifying SMTs
> are
> > > > > > primarily
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > > > source
> > > > > > > > > > > > > > > > > > connector side.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris
> > Egerton
> > > > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I have more comments I'd like to make
> on
> > > this
> > > > > KIP
> > > > > > > > when
> > > > > > > > > I
> > > > > > > > > > > have
> > > > > > > > > > > > > > time
> > > > > > > > > > > > > > > > > (sorry
> > > > > > > > > > > > > > > > > > > for the delay, Yash, and thanks for
> your
> > > > > > > patience!),
> > > > > > > > > but
> > > > > > > > > > I
> > > > > > > > > > > > did
> > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > chime in and say that I'm also not sure
> > > about
> > > > > > > > > overloading
> > > > > > > > > > > > > > > > > SinkTask::put.
> > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > share the concerns about creating an
> > > > intuitive,
> > > > > > > > simple
> > > > > > > > > > API
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > > > > raised. In addition, this approach
> > doesn't
> > > > seem
> > > > > > > very
> > > > > > > > > > > > > > > > sustainable--what
> > > > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > > > > we do if we encounter another case in
> the
> > > > > future
> > > > > > > that
> > > > > > > > > > would
> > > > > > > > > > > > > > > warrant a
> > > > > > > > > > > > > > > > > > > similar solution? We probably don't
> want
> > to
> > > > > > create
> > > > > > > > > three,
> > > > > > > > > > > > four,
> > > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > > > overloaded variants of the method, each
> > of
> > > > > which
> > > > > > > > would
> > > > > > > > > > have
> > > > > > > > > > > > to
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > implemented by connector developers who
> > > want
> > > > to
> > > > > > > both
> > > > > > > > > > > leverage
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > latest
> > > > > > > > > > > > > > > > > > > and greatest connector APIs and
> maintain
> > > > > > > > compatibility
> > > > > > > > > > with
> > > > > > > > > > > > > > connect
> > > > > > > > > > > > > > > > > > > Clusters running older versions.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I haven't been able to flesh this out
> > into
> > > a
> > > > > > design
> > > > > > > > > worth
> > > > > > > > > > > > > > > publishing
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > its
> > > > > > > > > > > > > > > > > > > own KIP yet, but one alternative I've
> > > pitched
> > > > > to
> > > > > > a
> > > > > > > > few
> > > > > > > > > > > people
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > generally positive interest has been to
> > > > develop
> > > > > > an
> > > > > > > > > > official
> > > > > > > > > > > > > > > > > compatibility
> > > > > > > > > > > > > > > > > > > library for Connect developers. This
> > > library
> > > > > > would
> > > > > > > be
> > > > > > > > > > > > released
> > > > > > > > > > > > > as
> > > > > > > > > > > > > > > its
> > > > > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > > > > Maven artifact (separate from
> > connect-api,
> > > > > > > > > > connect-runtime,
> > > > > > > > > > > > > etc.)
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > provide a simple, clean interface for
> > > > > developers
> > > > > > to
> > > > > > > > > > > determine
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > features are supported by the version
> of
> > > the
> > > > > > > Connect
> > > > > > > > > > > runtime
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > > > plugin has been deployed onto. Under
> the
> > > > hood,
> > > > > > this
> > > > > > > > > > library
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > reflection to determine whether
> classes,
> > > > > methods,
> > > > > > > > etc.
> > > > > > > > > > are
> > > > > > > > > > > > > > > available,
> > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > the developer wouldn't have to do
> > anything
> > > > more
> > > > > > > than
> > > > > > > > > > check
> > > > > > > > > > > > (for
> > > > > > > > > > > > > > > > > example)
> > > > > > > > > > > > > > > > > > >
> > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()`
> > > > > > > > > to
> > > > > > > > > > > know
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > > any
> > > > > > > > > > > > > > > > > > point
> > > > > > > > > > > > > > > > > > > in the lifetime of their connector/task
> > > > whether
> > > > > > > that
> > > > > > > > > > > feature
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > provided
> > > > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > the runtime.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > One other high-level comment: this
> > doesn't
> > > > > > address
> > > > > > > > > every
> > > > > > > > > > > > case,
> > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > consider adding an API to "ack" sink
> > > records.
> > > > > > This
> > > > > > > > > could
> > > > > > > > > > > use
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > SubmittedRecords class [1] (with some
> > > slight
> > > > > > > tweaks)
> > > > > > > > > > under
> > > > > > > > > > > > the
> > > > > > > > > > > > > > hood
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > track the latest-acked offset for each
> > > topic
> > > > > > > > partition.
> > > > > > > > > > > This
> > > > > > > > > > > > > way,
> > > > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > developers won't be responsible for
> > > tracking
> > > > > > > offsets
> > > > > > > > at
> > > > > > > > > > all
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > > > > tasks (eliminating issues with the
> > accuracy
> > > > of
> > > > > > > > > > > > > > post-transformation
> > > > > > > > > > > > > > > > > T/P/O
> > > > > > > > > > > > > > > > > > > sink record information), and they'll
> > only
> > > > have
> > > > > > to
> > > > > > > > > notify
> > > > > > > > > > > the
> > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > > framework when a record has been
> > > successfully
> > > > > > > > > dispatched
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > external
> > > > > > > > > > > > > > > > > > > system. This provides a cleaner,
> > friendlier
> > > > > API,
> > > > > > > and
> > > > > > > > > also
> > > > > > > > > > > > > enables
> > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > fine-grained metrics like the ones
> > proposed
> > > > in
> > > > > > > > KIP-767
> > > > > > > > > > [2].
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > [1] -
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > > > > > > > > > > > > > > > > [2] -
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash
> > Mayya
> > > <
> > > > > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > It's been a while for this one but
> the
> > > > more I
> > > > > > > think
> > > > > > > > > > about
> > > > > > > > > > > > it,
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > feel like the current approach with a
> > new
> > > > > > > > overloaded
> > > > > > > > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > > > > might not be optimal. We're trying to
> > > fix a
> > > > > > > pretty
> > > > > > > > > > corner
> > > > > > > > > > > > > case
> > > > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > > > here
> > > > > > > > > > > > > > > > > > > > (usage of topic mutating SMTs with
> sink
> > > > > > > connectors
> > > > > > > > > that
> > > > > > > > > > > do
> > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > > > > > tracking) and I'm not sure that
> > warrants
> > > a
> > > > > > change
> > > > > > > > to
> > > > > > > > > > > such a
> > > > > > > > > > > > > > > central
> > > > > > > > > > > > > > > > > > > > interface method. The new
> > `SinkTask::put`
> > > > > > method
> > > > > > > > just
> > > > > > > > > > > seems
> > > > > > > > > > > > > > > > somewhat
> > > > > > > > > > > > > > > > > > odd
> > > > > > > > > > > > > > > > > > > > and it may not be very understandable
> > > for a
> > > > > new
> > > > > > > > > reader
> > > > > > > > > > -
> > > > > > > > > > > I
> > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > should be the case for a public
> > interface
> > > > > > method.
> > > > > > > > > > > > > Furthermore,
> > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > elaborate documentation in place, I'm
> > not
> > > > > sure
> > > > > > if
> > > > > > > > > it'll
> > > > > > > > > > > be
> > > > > > > > > > > > > very
> > > > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > most people what the purpose of
> having
> > > > these
> > > > > > two
> > > > > > > > > `put`
> > > > > > > > > > > > > methods
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > > > they should be used by sink task
> > > > > > implementations.
> > > > > > > > > What
> > > > > > > > > > do
> > > > > > > > > > > > you
> > > > > > > > > > > > > > > > think?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash
> > > Mayya
> > > > <
> > > > > > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable
> > feedback
> > > > so
> > > > > > far!
> > > > > > > > > I've
> > > > > > > > > > > > > updated
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > > > > > based
> > > > > > > > > > > > > > > > > > > > > on our discussion above. Could you
> > > please
> > > > > > take
> > > > > > > > > > another
> > > > > > > > > > > > > look?
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM
> > Randall
> > > > > > Hauch <
> > > > > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM
> Yash
> > > > > Mayya <
> > > > > > > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> > Hi Randall,
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > Thanks for elaborating. I think
> > > these
> > > > > are
> > > > > > > all
> > > > > > > > > very
> > > > > > > > > > > > good
> > > > > > > > > > > > > > > points
> > > > > > > > > > > > > > > > > > and I
> > > > > > > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > > > > > >> > why the overloaded
> `SinkTask::put`
> > > > > method
> > > > > > > is a
> > > > > > > > > > > cleaner
> > > > > > > > > > > > > > > > solution
> > > > > > > > > > > > > > > > > > > > overall.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > > public void
> > > > put(Collection<SinkRecord>
> > > > > > > > > records,
> > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > TopicPartition>
> > > > updatedTopicPartitions)
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > I think this should be
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > `public void
> > > > put(Collection<SinkRecord>
> > > > > > > > records,
> > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > TopicPartition>
> > > > > originalTopicPartitions)`
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > instead because the sink records
> > > > > > themselves
> > > > > > > > have
> > > > > > > > > > the
> > > > > > > > > > > > > > updated
> > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > > > >> > partitions (i.e. after all
> > > > > transformations
> > > > > > > > have
> > > > > > > > > > been
> > > > > > > > > > > > > > > applied)
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > > > > > > > > >> > is proposing a way for the tasks
> > to
> > > be
> > > > > > able
> > > > > > > to
> > > > > > > > > > > access
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > > > >> > partition (i.e. before
> > > transformations
> > > > > > have
> > > > > > > > been
> > > > > > > > > > > > > applied).
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> Sounds good.
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > > Of course, if the developer
> does
> > > not
> > > > > > need
> > > > > > > > > > separate
> > > > > > > > > > > > > > > methods,
> > > > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > >> > easily have the older `put`
> method
> > > > > simply
> > > > > > > > > delegate
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > If the developer does not need
> > > > separate
> > > > > > > > methods
> > > > > > > > > > > (i.e.
> > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > >> > use this new addition), they can
> > > > simply
> > > > > > > > continue
> > > > > > > > > > > > > > > implementing
> > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> > older `put` method right?
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> Correct. We should update the
> > JavaDoc
> > > of
> > > > > > both
> > > > > > > > > > methods
> > > > > > > > > > > to
> > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > clear,
> > > > > > > > > > > > > > > > > > > > >> and in general how the two methods
> > > > should
> > > > > > are
> > > > > > > > used
> > > > > > > > > > and
> > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > >> implemented. That can be part of
> the
> > > PR,
> > > > > and
> > > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > >> wording.
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a
> roadmap
> > > for
> > > > > > > > > > *eventually*
> > > > > > > > > > > > > > > > deprecating
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > >> > method, once the Connect runtime
> > > > > versions
> > > > > > > > > without
> > > > > > > > > > > this
> > > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > >> > enough.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > I'm not sure we'd ever want to
> > > > deprecate
> > > > > > the
> > > > > > > > > older
> > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > Most
> > > > > > > > > > > > > > > > > > > common
> > > > > > > > > > > > > > > > > > > > >> sink
> > > > > > > > > > > > > > > > > > > > >> > connector implementations do not
> > do
> > > > > their
> > > > > > > own
> > > > > > > > > > offset
> > > > > > > > > > > > > > > tracking
> > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > >> > asynchronous processing and will
> > > > > probably
> > > > > > > > never
> > > > > > > > > > > have a
> > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> > additional parameter
> > > `Map<SinkRecord,
> > > > > > > > > > > TopicPartition>
> > > > > > > > > > > > > > > > > > > > >> > originalTopicPartitions` in the
> > > > proposed
> > > > > > new
> > > > > > > > > `put`
> > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > These
> > > > > > > > > > > > > > > > > > > > >> connectors
> > > > > > > > > > > > > > > > > > > > >> > can continue implementing only
> the
> > > > > > existing
> > > > > > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > > >> > will be called by the default
> > > > > > implementation
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > >> > method.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> +1
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > > the pre-commit methods use the
> > > same
> > > > > > > > > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > > > > > > > > >> > OffsetAndMetadata>
> currentOffsets`
> > > > data
> > > > > > > > > structure
> > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > suggesting
> > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > >> used.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > The data structure you're
> > suggesting
> > > > be
> > > > > > used
> > > > > > > > is
> > > > > > > > > a
> > > > > > > > > > > > > > > > > `Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > TopicPartition>` which will map
> > > > > > `SinkRecord`
> > > > > > > > > > objects
> > > > > > > > > > > > to
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > > > > > > >> > partition of the corresponding
> > > > > > > > `ConsumerRecord`
> > > > > > > > > > > right?
> > > > > > > > > > > > > To
> > > > > > > > > > > > > > > > > clarify,
> > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > > > > > > > >> > a new data structure that will
> > need
> > > to
> > > > > be
> > > > > > > > > managed
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> `WorkerSinkTask`.
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> Ah, you're right. Thanks for the
> > > > > correction.
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> Best regards,
> > > > > > > > > > > > > > > > > > > > >> Randall
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > > > > > > > >> > Yash
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM
> > > Randall
> > > > > > > Hauch <
> > > > > > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >> > > Hi, Yash.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > I'm not sure I quite
> understand
> > > why
> > > > it
> > > > > > > would
> > > > > > > > > be
> > > > > > > > > > > > > "easier"
> > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > >> > > > developers to account for
> > > > > implementing
> > > > > > > two
> > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that they
> > want
> > > > to
> > > > > > use
> > > > > > > > this
> > > > > > > > > > new
> > > > > > > > > > > > > > > feature)
> > > > > > > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around
> > > > `SinkRecord`
> > > > > > > access
> > > > > > > > > > > > methods?
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > Using a try-catch to try
> around
> > an
> > > > API
> > > > > > > > method
> > > > > > > > > > that
> > > > > > > > > > > > > > *might*
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > > > >> is a
> > > > > > > > > > > > > > > > > > > > >> > > very unusual thing for most
> > > > > developers.
> > > > > > > > > > > > Unfortunately,
> > > > > > > > > > > > > > > we've
> > > > > > > > > > > > > > > > > had
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > >> > resort
> > > > > > > > > > > > > > > > > > > > >> > > to this atypical approach with
> > > > Connect
> > > > > > in
> > > > > > > > > places
> > > > > > > > > > > > when
> > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > > > no
> > > > > > > > > > > > > > > > > > > > >> good
> > > > > > > > > > > > > > > > > > > > >> > > alternative. We seem to
> relying
> > > upon
> > > > > > > pattern
> > > > > > > > > > > because
> > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > easier
> > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > >> us,
> > > > > > > > > > > > > > > > > > > > >> > > not because it offers a better
> > > > > > experience
> > > > > > > > for
> > > > > > > > > > > > > Connector
> > > > > > > > > > > > > > > > > > > developers.
> > > > > > > > > > > > > > > > > > > > >> IMO,
> > > > > > > > > > > > > > > > > > > > >> > if
> > > > > > > > > > > > > > > > > > > > >> > > there's a practical
> alternative
> > > that
> > > > > > uses
> > > > > > > > > normal
> > > > > > > > > > > > > > > development
> > > > > > > > > > > > > > > > > > > > practices
> > > > > > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > > > > > >> > > techniques, then we should use
> > > that
> > > > > > > > > alternative.
> > > > > > > > > > > > IIUC,
> > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > > > > > >> least
> > > > > > > > > > > > > > > > > > > > >> > > one practical alternative for
> > this
> > > > KIP
> > > > > > > that
> > > > > > > > > > would
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > > > > >> developers
> > > > > > > > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > > > > > > > >> > > use the unusual try-catch to
> > > handle
> > > > > the
> > > > > > > case
> > > > > > > > > > where
> > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > >> found.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > I also think having two `put`
> > > > methods
> > > > > is
> > > > > > > > > easier
> > > > > > > > > > > when
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > Connector
> > > > > > > > > > > > > > > > > > > > >> has to
> > > > > > > > > > > > > > > > > > > > >> > > do different things for
> > different
> > > > > > Connect
> > > > > > > > > > > runtimes,
> > > > > > > > > > > > > too.
> > > > > > > > > > > > > > > One
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > > > > > > >> > > methods is called by newer
> > Connect
> > > > > > > runtimes
> > > > > > > > > with
> > > > > > > > > > > the
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > behavior,
> > > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > > > >> > > other method is called by an
> > older
> > > > > > Connect
> > > > > > > > > > > runtime.
> > > > > > > > > > > > Of
> > > > > > > > > > > > > > > > course,
> > > > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> > > developer does not need
> separate
> > > > > > methods,
> > > > > > > > they
> > > > > > > > > > can
> > > > > > > > > > > > > > easily
> > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > >> > > `put` method simply delegate
> to
> > > the
> > > > > > newer
> > > > > > > > > > method.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a
> roadmap
> > > for
> > > > > > > > > > *eventually*
> > > > > > > > > > > > > > > > deprecating
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > >> > > method, once the Connect
> runtime
> > > > > > versions
> > > > > > > > > > without
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > >> > > enough.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > I think the advantage of going
> > > with
> > > > > the
> > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the KIP
> > is
> > > > that
> > > > > > it
> > > > > > > > > > wouldn't
> > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > extra
> > > > > > > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in
> > > > `WorkerSinkTask`
> > > > > in
> > > > > > > > your
> > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > approach)
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > The connector does have to do
> > some
> > > > of
> > > > > > this
> > > > > > > > > > > > bookkeeping
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > > > > >> track
> > > > > > > > > > > > > > > > > > > > >> > > the topic partition offsets
> used
> > > in
> > > > > the
> > > > > > > > > > > `preCommit`,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> pre-commit
> > > > > > > > > > > > > > > > > > > > >> > > methods use the same
> > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > > > OffsetAndMetadata>
> > > > > > > > > > > > > > > > > > > > >> > > currentOffsets`
> > > > > > > > > > > > > > > > > > > > >> > > data structure I'm suggesting
> be
> > > > used.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > I hope that helps.
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > Best regards,
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > Randall
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38
> AM
> > > Yash
> > > > > > > Mayya <
> > > > > > > > > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > Thanks for reviewing the
> KIP!
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > That latter logic can get
> > > quite
> > > > > > ugly.
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > I'm not sure I quite
> > understand
> > > > why
> > > > > it
> > > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > > > "easier"
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > >> connector
> > > > > > > > > > > > > > > > > > > > >> > > > developers to account for
> > > > > implementing
> > > > > > > two
> > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that they
> > want
> > > > to
> > > > > > use
> > > > > > > > this
> > > > > > > > > > new
> > > > > > > > > > > > > > > feature)
> > > > > > > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around
> > > > `SinkRecord`
> > > > > > > access
> > > > > > > > > > > > methods?
> > > > > > > > > > > > > In
> > > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > > > cases, a
> > > > > > > > > > > > > > > > > > > > >> > > > connector developer would
> need
> > > to
> > > > > > write
> > > > > > > > > > > additional
> > > > > > > > > > > > > > code
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > order
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > >> > > ensure
> > > > > > > > > > > > > > > > > > > > >> > > > that their connector
> continues
> > > > > working
> > > > > > > > with
> > > > > > > > > > > older
> > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > > > runtimes.
> > > > > > > > > > > > > > > > > > > > >> > > > Furthermore, we would
> probably
> > > > need
> > > > > to
> > > > > > > > > > carefully
> > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >> > > > implementation for the older
> > > `put`
> > > > > > > method
> > > > > > > > > > should
> > > > > > > > > > > > > look
> > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > >> > connectors
> > > > > > > > > > > > > > > > > > > > >> > > > that want to use this new
> > > > feature. I
> > > > > > > think
> > > > > > > > > the
> > > > > > > > > > > > > > advantage
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > going
> > > > > > > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the KIP
> > is
> > > > that
> > > > > > it
> > > > > > > > > > wouldn't
> > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > extra
> > > > > > > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in
> > > > `WorkerSinkTask`
> > > > > in
> > > > > > > > your
> > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > approach)
> > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > >> also
> > > > > > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > > > > > >> > > > fact that the try-catch
> based
> > > > logic
> > > > > is
> > > > > > > an
> > > > > > > > > > > already
> > > > > > > > > > > > > > > > > established
> > > > > > > > > > > > > > > > > > > > >> pattern
> > > > > > > > > > > > > > > > > > > > >> > > > through
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > > > > > > > > > > > > > > > > >> > > > and other KIPs which added
> > > methods
> > > > > to
> > > > > > > > > > > source/sink
> > > > > > > > > > > > > > > > > > connector/task
> > > > > > > > > > > > > > > > > > > > >> > > contexts.
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > Let me know if you still
> feel
> > > that
> > > > > > > having
> > > > > > > > a
> > > > > > > > > > new
> > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > put
> > > > > > > > > > > > > > > > > > > > >> method
> > > > > > > > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > > > > > > > >> > > a
> > > > > > > > > > > > > > > > > > > > >> > > > cleaner solution and I'd be
> > > happy
> > > > to
> > > > > > > > > > reconsider!
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > > > > > > > >> > > > Yash
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at
> 11:18
> > PM
> > > > > > Randall
> > > > > > > > > > Hauch <
> > > > > > > > > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for
> picking
> > > up
> > > > > this
> > > > > > > KIP
> > > > > > > > > and
> > > > > > > > > > > > > > > discussion.
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > The KIP includes this
> > rejected
> > > > > > > > > alternative:
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put
> in
> > > any
> > > > > way
> > > > > > to
> > > > > > > > > pass
> > > > > > > > > > > the
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > >> > outside
> > > > > > > > > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map
> or
> > a
> > > > > > derived
> > > > > > > > > class)
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > >    -
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > >    Much more disruptive
> > > change
> > > > > > > without
> > > > > > > > > > > > > > considerable
> > > > > > > > > > > > > > > > pros
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > One advantage about doing
> > this
> > > > is
> > > > > > that
> > > > > > > > > sink
> > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > >> implementations
> > > > > > > > > > > > > > > > > > > > >> > > can
> > > > > > > > > > > > > > > > > > > > >> > > > > more easily implement two
> > > > > different
> > > > > > > > > > "put(...)"
> > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > > > > > > >> > > running
> > > > > > > > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > > > > > > > >> > > > > a variety of runtimes,
> > without
> > > > > > having
> > > > > > > to
> > > > > > > > > use
> > > > > > > > > > > > > > try-catch
> > > > > > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > > > > > > >> around
> > > > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > > > >> > > > > newer SinkRecord access
> > > methods.
> > > > > > That
> > > > > > > > > latter
> > > > > > > > > > > > logic
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > get
> > > > > > > > > > > > > > > > > > > quite
> > > > > > > > > > > > > > > > > > > > >> > ugly.
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > For example, the existing
> > > `put`
> > > > > > method
> > > > > > > > has
> > > > > > > > > > > this
> > > > > > > > > > > > > > > > signature:
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > public abstract void
> > > > > > > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > > > > > > records);
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > If we added an overloaded
> > > method
> > > > > > that
> > > > > > > > > passed
> > > > > > > > > > > in
> > > > > > > > > > > > a
> > > > > > > > > > > > > > map
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > >> > > > > topic+partition for each
> > > record
> > > > > (and
> > > > > > > > > defined
> > > > > > > > > > > the
> > > > > > > > > > > > > > > absence
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > >> entry
> > > > > > > > > > > > > > > > > > > > >> > as
> > > > > > > > > > > > > > > > > > > > >> > > > > having an unchanged topic
> > and
> > > > > > > > partition):
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > records,
> > > > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > > > > > > updatedTopicPartitions)
> > > > > > > > {
> > > > > > > > > > > > > > > > > > > > >> > > > > put(records);
> > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > then a `SinkTask`
> > > implementation
> > > > > > that
> > > > > > > > > wants
> > > > > > > > > > to
> > > > > > > > > > > > use
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > > >> feature
> > > > > > > > > > > > > > > > > > > > >> > > could
> > > > > > > > > > > > > > > > > > > > >> > > > > simply implement both
> > methods:
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > records)
> > > > > > > > > > > > {
> > > > > > > > > > > > > > > > > > > > >> > > > > // Running in an older
> > > runtime,
> > > > so
> > > > > > no
> > > > > > > > > > tracking
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > SMT-modified
> > > > > > > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > > > > > > >> > > > names
> > > > > > > > > > > > > > > > > > > > >> > > > > or partitions
> > > > > > > > > > > > > > > > > > > > >> > > > > put(records, Map.of());
> > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > records,
> > > > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > > > > > > updatedTopicPartitions)
> > > > > > > > {
> > > > > > > > > > > > > > > > > > > > >> > > > > // real logic here
> > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > This seems a lot easier
> than
> > > > > having
> > > > > > to
> > > > > > > > use
> > > > > > > > > > > > > try-catch
> > > > > > > > > > > > > > > > > logic,
> > > > > > > > > > > > > > > > > > > yet
> > > > > > > > > > > > > > > > > > > > >> still
> > > > > > > > > > > > > > > > > > > > >> > > > > allows sink connectors to
> > > > utilize
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > > > > functionality
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > > > > > >> > work
> > > > > > > > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > > > > > > > >> > > > > older Connect runtimes.
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > WDYT?
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > Randall
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at
> 7:03
> > AM
> > > > > Yash
> > > > > > > > Mayya
> > > > > > > > > <
> > > > > > > > > > > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > I would like to
> (re)start
> > a
> > > > new
> > > > > > > > > discussion
> > > > > > > > > > > > > thread
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > KIP-793
> > > > > > > > > > > > > > > > > > > > >> (Kafka
> > > > > > > > > > > > > > > > > > > > >> > > > > > Connect) which proposes
> > some
> > > > > > > additions
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > > > > > SinkRecord
> > > > > > > > > > > > > > > > > > > > >> > > > interface
> > > > > > > > > > > > > > > > > > > > >> > > > > > in order to support
> topic
> > > > > mutating
> > > > > > > > SMTs
> > > > > > > > > > for
> > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > > > > > >> that do
> > > > > > > > > > > > > > > > > > > > >> > > > their
> > > > > > > > > > > > > > > > > > > > >> > > > > > own offset tracking.
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > Links:
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > KIP:
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > Older discussion thread:
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h
> > > > > > > > > > > ,
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > Jira:
> > > > > > > > > > > > > > > >
> > > https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > >> > > > > > Yash
> > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to