Thanks for the proposed KIP, Diego. I plan to review this in more detail in the coming weeks.
Best regards, Randall On Wed, Nov 3, 2021 at 5:42 PM Chris Egerton <chr...@confluent.io.invalid> wrote: > > Hi Diego, > > This is a long time coming and I'm glad to see someone's finally gotten > around to filling in this feature gap for Connect. > > It looks like this KIP does not take the SinkTask::open and SinkTask::close > methods into account ( > https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection > / > https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)). > Is this intentional? If so, it'd be nice to see a rationale for leaving > this out in the rejected alternatives so; if not, I think we may want to > add this type of support to the KIP so that we can solve the mutating > SMT/asynchronous sink connector problem once and for all, instead of > narrowing but not closing the existing feature gap. We may want to take the > current effort to add support for cooperative consumer groups ( > https://issues.apache.org/jira/browse/KAFKA-12487 / > https://github.com/apache/kafka/pull/10563) into account if we opt to add > support for open/close, since the current behavior of Connect (which > involves invoking SinkTask::close for every topic partition every time a > consumer rebalance occurs, then invoking SinkTask::open for all > still-assigned partitions) may be easier to reason about, but is likely > going to change soon (although it is an option to hold off on that work if > this KIP is given priority, which is definitely a valid option). > > It also looks like we're only exposing the original topic partition to > connector developers. I agree with the rationale for not exposing more of > the original consumer record for the most part, but what about the record's > offset? Although it's not possible to override the Kafka offset for a sink > record via the standard SinkRecord::newRecord methods ( > https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long) > / > https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)), > there are still public constructors available for the SinkRecord class that > can be leveraged by SMTs to return new SinkRecord instances that don't have > the same Kafka offset as the one that they've mutated. Do you think it may > be worth the additional maintenance burden and API complexity to > accommodate this case, with something like a SinkTask::originalKafkaOffset > method? > > I'm also wondering about how exactly this method will be implemented. Will > we automatically create a new SinkRecord instance at the end of the > transformation chain in order to provide the correct topic partition (and > possibly offset)? If so, this should be called out since it means that > transformations that return custom subclasses of SinkRecord will no longer > be able to do so (or rather, they will still be able to, but these custom > subclasses will never be visible to sink tasks). > > Finally, a small nit: do you think it'd make sense to separate out the > newly-proposed SinkTask::originalTopicPartition method into separate > SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to > stay in line with the convention that's been loosely set by the existing, > separate SinkTask::topic and SinkTask::kafkaPartition methods? > > I'm personally looking forward to leveraging this improvement in the > BigQuery sink connector I help maintain because we recently added a new > write mode that uses asynchronous writes and SinkTask::preCommit, but > encourage users to use SMTs to redirect records to different > datasets/tables in BigQuery, which is currently impossible in that write > mode. Thanks for taking this on! > > Cheers, > > Chris > > On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <erd...@gmail.com> wrote: > > > Hello, > > > > I'd like to propose a small KIP to add a new field to SinkRecord in order > > to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous > > Sink Connectors (the ones that override preCommit for internal offset > > tracking, like S3 > > < > > https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274 > > > > > ). > > > > Links: > > > > - KIP-793: Sink Connectors: Support topic-mutating SMTs for async > > connectors (preCommit users) > > <https://cwiki.apache.org/confluence/x/fpFnCw> > > - PR #11464 <https://github.com/apache/kafka/pull/11464> > > > > Thanks, > > > > Diego > >