Hi Chris,
Thanks for pointing that out, I hadn't realized that the SubmittedRecords
class has almost exactly the same semantics needed for handling offset
commits in the per-sink record ack API case. However, I agree that it isn't
worth the tradeoff and we've already discussed the backward
Hi Yash,
Thanks for your continued work on this tricky feature. I have no further
comments or suggestions on the KIP and am ready to vote in favor of it.
That said, I did want to quickly respond to this comment:
> On a side note, this also means that the per sink record ack API
that was
Hi Chris,
Firstly, thanks for sharing your detailed thoughts on this thorny issue!
Point taken on Kafka Connect being a brownfield project and I guess we
might just need to trade off elegant / "clean" interfaces for fixing this
gap in functionality. Also, thanks for calling out all the existing
Hi Yash,
I've been following the discussion and have some thoughts. Ultimately I'm
still in favor of this KIP and would hate to see it go dormant, though we
may end up settling for a less-invasive option.
On the topic of abstraction and inter-plugin interactions:
First, there already are
Hi Greg,
Thanks for the response and sorry for the late reply.
> Currently the AK tests have a lot of calls to, for example, new
> SinkRecord(String topic, int partition, Schema keySchema,
> Object key, Schema valueSchema, Object value, long kafkaOffset)
> , a constructor without the original
Yash,
> 'm not sure I follow - are you asking about how the tests will be updated
post this change or about how upgrades will look like for clusters in
production?
Currently the AK tests have a lot of calls to, for example, new
SinkRecord(String topic, int partition, Schema keySchema, Object
Hi Greg,
Thanks for the detailed review!
> What is the expected state/behavior for SinkRecords
> which do not have original T/P/O information after the
> upgrade? Just browsing, it appears that tests make
> extensive use of the existing public SinkRecord
> constructors for both Transformations
Hi Yash,
I always use this issue as an example of a bug being caused by design
rather than by implementation error, and once it's fixed I'll need to find
something else to talk about :)
So glad to see this get fixed!
I'll chime in to support some of the earlier discussions that seem to have
been
Hi Yash,
We'll probably want to make a few tweaks to the Javadocs for the new
methods (I'm imagining that notes on compatibility with older versions will
be required), but I believe what's proposed in the KIP is good enough to
approve with the understanding that it may not exactly match what gets
Hi Chris,
> we might try to introduce a framework-level configuration
> property to dictate which of the pre-transform and post-transform
> topic partitions are used for the fallback call to the single-arg
> variant if a task class has not overridden the multi-arg variant
Thanks for the
Hi Yash,
I think the use case for pre-transform TPO coordinates (and topic partition
writers created/destroyed in close/open) tends to boil down to exactly-once
semantics, where it's desirable to preserve the guarantees that Kafka
provides (every record has a unique TPO trio, and records are
Hi Chris,
> I was actually envisioning something like `void
> open(Collection originalPartitions,
> Collection transformedPartitions)`
Ah okay, this does make a lot more sense. Sorry, I think I misunderstood
you earlier. I do agree with you that this seems better than splitting it
off into two
Hi Yash,
I was actually envisioning something like `void
open(Collection
originalPartitions, Collection transformedPartitions)`,
since we already convert and transform each batch of records that we poll
from the sink task's consumer en masse, meaning we could discover several
new transformed
Hi Chris,
> Plus, if a connector is intentionally designed to
> use pre-transformation topic partitions in its
> open/close methods, wouldn't we just be trading
> one form of the problem for another by making this
> switch?
Thanks, this makes sense, and given that the KIP already proposes a way
Hi Yash,
I think the key difference between adding methods/overloads related to
SinkTask::open/SinkTask::close and SinkTask::put is that this isn't
auxiliary information that may or may not be useful to connector
developers. It's actually critical for them to understand the difference
between the
Hi Chris,
> especially if connectors are intentionally designed around
> original topic partitions instead of transformed ones.
Ha, that's a good point and reminds me of Hyrum's Law [1] :)
> I think we have to provide connector developers with some
> way to differentiate between the two, but
Hi Yash,
> So it looks like with the current state of affairs, sink tasks that only
instantiate writers in the SinkTask::open method (and don't do the lazy
instantiation in SinkTask::put that you mentioned) might fail when used
with topic/partition mutating SMTs even if they don't do any
Hi Chris,
Thanks for the feedback.
1) That's a fair point; while I did scan everything publicly available on
GitHub, you're right in that it won't cover all possible SMTs that are out
there. Thanks for the example use-case as well, I've updated the KIP to add
the two new proposed methods.
2) So
Hi Yash,
I've had some time to think on this KIP and I think I'm in agreement about
not blocking it on an official compatibility library or adding the "ack"
API for sink records.
I only have two more thoughts:
1. Because it is possible to manipulate sink record partitions and offsets
with the
Hi Chris,
Thanks a lot for your inputs!
> would provide a simple, clean interface for developers to determine
> which features are supported by the version of the Connect runtime
> that their plugin has been deployed onto
I do like the idea of having such a public compatibility library - I
Hi all,
I have more comments I'd like to make on this KIP when I have time (sorry
for the delay, Yash, and thanks for your patience!), but I did want to
chime in and say that I'm also not sure about overloading SinkTask::put. I
share the concerns about creating an intuitive, simple API that Yash
Hi Randall,
It's been a while for this one but the more I think about it, the more I
feel like the current approach with a new overloaded `SinkTask::put` method
might not be optimal. We're trying to fix a pretty corner case bug here
(usage of topic mutating SMTs with sink connectors that do their
Hi Randall,
Thanks a lot for your valuable feedback so far! I've updated the KIP based
on our discussion above. Could you please take another look?
Thanks,
Yash
On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch wrote:
> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya wrote:
>
> > Hi Randall,
> >
> >
On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya wrote:
> Hi Randall,
>
> Thanks for elaborating. I think these are all very good points and I see
> why the overloaded `SinkTask::put` method is a cleaner solution overall.
>
> > public void put(Collection records, Map TopicPartition>
Hi Randall,
Thanks for elaborating. I think these are all very good points and I see
why the overloaded `SinkTask::put` method is a cleaner solution overall.
> public void put(Collection records, Map updatedTopicPartitions)
I think this should be
`public void put(Collection records, Map
Hi, Yash.
I'm not sure I quite understand why it would be "easier" for connector
> developers to account for implementing two different overloaded `put`
> methods (assuming that they want to use this new feature) versus using a
> try-catch block around `SinkRecord` access methods?
Using a
Hi Randall,
Thanks for reviewing the KIP!
> That latter logic can get quite ugly.
I'm not sure I quite understand why it would be "easier" for connector
developers to account for implementing two different overloaded `put`
methods (assuming that they want to use this new feature) versus using a
Hi, Yash. Thanks for picking up this KIP and discussion.
The KIP includes this rejected alternative:
> 4. Update SinkTask.put in any way to pass the new information outside
> SinkRecord (e.g. a Map or a derived class)
>
>-
>
>Much more disruptive change without considerable pros
>
>
One
Hi all,
I would like to (re)start a new discussion thread on KIP-793 (Kafka
Connect) which proposes some additions to the public SinkRecord interface
in order to support topic mutating SMTs for sink connectors that do their
own offset tracking.
Links:
KIP:
Apologies for the poor formatting on the quoted bits in the previous email.
On 2021/11/03 22:17:06 Diego Erdody wrote:
> Hello,
>
> I'd like to propose a small KIP to add a new field to SinkRecord in order
> to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
> Sink
Hi Diego,
Thanks for writing this KIP. Are you still planning to work on this? If
not, I'd be happy to try and take this to completion!
Hi Chris,
Thanks for your valuable inputs as always!
> It looks like this KIP does not take the SinkTask::open and
SinkTask::close methods into account
> I
Thanks for the proposed KIP, Diego. I plan to review this in more
detail in the coming weeks.
Best regards,
Randall
On Wed, Nov 3, 2021 at 5:42 PM Chris Egerton
wrote:
>
> Hi Diego,
>
> This is a long time coming and I'm glad to see someone's finally gotten
> around to filling in this feature
Hi Diego,
This is a long time coming and I'm glad to see someone's finally gotten
around to filling in this feature gap for Connect.
It looks like this KIP does not take the SinkTask::open and SinkTask::close
methods into account (
Hello,
I'd like to propose a small KIP to add a new field to SinkRecord in order
to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
Sink Connectors (the ones that override preCommit for internal offset
tracking, like S3
34 matches
Mail list logo