Thanks for the proposed KIP, Diego. I plan to review this in more
detail in the coming weeks.

Best regards,

Randall

On Wed, Nov 3, 2021 at 5:42 PM Chris Egerton
<chr...@confluent.io.invalid> wrote:
>
> Hi Diego,
>
> This is a long time coming and I'm glad to see someone's finally gotten
> around to filling in this feature gap for Connect.
>
> It looks like this KIP does not take the SinkTask::open and SinkTask::close
> methods into account (
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection
> /
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)).
> Is this intentional? If so, it'd be nice to see a rationale for leaving
> this out in the rejected alternatives so; if not, I think we may want to
> add this type of support to the KIP so that we can solve the mutating
> SMT/asynchronous sink connector problem once and for all, instead of
> narrowing but not closing the existing feature gap. We may want to take the
> current effort to add support for cooperative consumer groups (
> https://issues.apache.org/jira/browse/KAFKA-12487 /
> https://github.com/apache/kafka/pull/10563) into account if we opt to add
> support for open/close, since the current behavior of Connect (which
> involves invoking SinkTask::close for every topic partition every time a
> consumer rebalance occurs, then invoking SinkTask::open for all
> still-assigned partitions) may be easier to reason about, but is likely
> going to change soon (although it is an option to hold off on that work if
> this KIP is given priority, which is definitely a valid option).
>
> It also looks like we're only exposing the original topic partition to
> connector developers. I agree with the rationale for not exposing more of
> the original consumer record for the most part, but what about the record's
> offset? Although it's not possible to override the Kafka offset for a sink
> record via the standard SinkRecord::newRecord methods (
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long)
> /
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)),
> there are still public constructors available for the SinkRecord class that
> can be leveraged by SMTs to return new SinkRecord instances that don't have
> the same Kafka offset as the one that they've mutated. Do you think it may
> be worth the additional maintenance burden and API complexity to
> accommodate this case, with something like a SinkTask::originalKafkaOffset
> method?
>
> I'm also wondering about how exactly this method will be implemented. Will
> we automatically create a new SinkRecord instance at the end of the
> transformation chain in order to provide the correct topic partition (and
> possibly offset)? If so, this should be called out since it means that
> transformations that return custom subclasses of SinkRecord will no longer
> be able to do so (or rather, they will still be able to, but these custom
> subclasses will never be visible to sink tasks).
>
> Finally, a small nit: do you think it'd make sense to separate out the
> newly-proposed SinkTask::originalTopicPartition method into separate
> SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to
> stay in line with the convention that's been loosely set by the existing,
> separate SinkTask::topic and SinkTask::kafkaPartition methods?
>
> I'm personally looking forward to leveraging this improvement in the
> BigQuery sink connector I help maintain because we recently added a new
> write mode that uses asynchronous writes and SinkTask::preCommit, but
> encourage users to use SMTs to redirect records to different
> datasets/tables in BigQuery, which is currently impossible in that write
> mode. Thanks for taking this on!
>
> Cheers,
>
> Chris
>
> On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <erd...@gmail.com> wrote:
>
> > Hello,
> >
> > I'd like to propose a small KIP to add a new field to SinkRecord in order
> > to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
> > Sink Connectors (the ones that override preCommit for internal offset
> > tracking, like S3
> > <
> > https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274
> > >
> > ).
> >
> > Links:
> >
> > - KIP-793: Sink Connectors: Support topic-mutating SMTs for async
> > connectors (preCommit users)
> > <https://cwiki.apache.org/confluence/x/fpFnCw>
> > - PR #11464 <https://github.com/apache/kafka/pull/11464>
> >
> > Thanks,
> >
> > Diego
> >

Reply via email to