Hi Randall,

Thanks for reviewing the KIP!

> That latter logic can get quite ugly.

I'm not sure I quite understand why it would be "easier" for connector
developers to account for implementing two different overloaded `put`
methods (assuming that they want to use this new feature) versus using a
try-catch block around `SinkRecord` access methods? In both cases, a
connector developer would need to write additional code in order to ensure
that their connector continues working with older Connect runtimes.
Furthermore, we would probably need to carefully document how the
implementation for the older `put` method should look like for connectors
that want to use this new feature. I think the advantage of going with the
proposed approach in the KIP is that it wouldn't require extra book-keeping
(the Map<SinkRecord,
TopicPartition> in `WorkerSinkTask` in your proposed approach) and also the
fact that the try-catch based logic is an already established pattern
through
https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
and other KIPs which added methods to source/sink connector/task contexts.

Let me know if you still feel that having a new overloaded put method is a
cleaner solution and I'd be happy to reconsider!

Thanks,
Yash

On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <rha...@gmail.com> wrote:

> Hi, Yash. Thanks for picking up this KIP and discussion.
>
> The KIP includes this rejected alternative:
>
> > 4. Update SinkTask.put in any way to pass the new information outside
> > SinkRecord (e.g. a Map or a derived class)
> >
> >    -
> >
> >    Much more disruptive change without considerable pros
> >
> >
> One advantage about doing this is that sink connector implementations can
> more easily implement two different "put(...)" methods to handle running in
> a variety of runtimes, without having to use try-catch logic around the
> newer SinkRecord access methods. That latter logic can get quite ugly.
>
> For example, the existing `put` method has this signature:
>
> public abstract void put(Collection<SinkRecord> records);
>
> If we added an overloaded method that passed in a map of the old
> topic+partition for each record (and defined the absence of an entry as
> having an unchanged topic and partition):
>
> public void put(Collection<SinkRecord> records, Map<SinkRecord,
> TopicPartition> updatedTopicPartitions) {
> put(records);
> }
>
> then a `SinkTask` implementation that wants to use this new feature could
> simply implement both methods:
>
> public void put(Collection<SinkRecord> records) {
> // Running in an older runtime, so no tracking of SMT-modified topic names
> or partitions
> put(records, Map.of());
> }
>
> public void put(Collection<SinkRecord> records, Map<SinkRecord,
> TopicPartition> updatedTopicPartitions) {
> // real logic here
> }
>
> This seems a lot easier than having to use try-catch logic, yet still
> allows sink connectors to utilize the new functionality and still work with
> older Connect runtimes.
>
> WDYT?
>
> Randall
>
>
> On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi all,
> >
> > I would like to (re)start a new discussion thread on KIP-793 (Kafka
> > Connect) which proposes some additions to the public SinkRecord interface
> > in order to support topic mutating SMTs for sink connectors that do their
> > own offset tracking.
> >
> > Links:
> >
> > KIP:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> >
> > Older discussion thread:
> > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h,
> > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> >
> > Jira: https://issues.apache.org/jira/browse/KAFKA-13431
> >
> >
> > Thanks,
> > Yash
> >
>

Reply via email to