C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1265675099
########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. Review Comment: On top of the `Transformation` link nit, we can simplify some of the language here: ```suggestion * Get the original topic for this sink record, before any * {@link Transformation transformations} were applied. * In order to be compatible with transformations that mutate topic names, this method should be used * by sink tasks instead of {@link #topic()} for any internal offset tracking purposes (for instance, reporting offsets to the Connect runtime via * {@link SinkTask#preCommit(Map)}). ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -20,18 +20,25 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; +import java.util.Objects; /** - * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of - * the record in the Kafka topic-partition in addition to the standard fields. This information - * should be used by the {@link SinkTask} to coordinate kafkaOffset commits. + * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the original Kafka record's + * topic, partition and offset (before any {@link Transformation}s have been applied) in addition to the standard fields. + * This information should be used by the {@link SinkTask} to coordinate offset commits. Review Comment: Nit: formatting/readability ```suggestion * topic, partition and offset (before any {@link Transformation transformations} * have been applied) in addition to the standard fields. This information should be used by the {@link SinkTask} to coordinate * offset commits. ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by Review Comment: Nit: formatting/readability ```suggestion * {@link org.apache.kafka.connect.transforms.Transformation transformations} were applied. This should be used by ``` (this applies to a bunch of places in this PR; only noting it here to reduce noise) ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. Review Comment: Nit: language tweak, clarification on unsupported Connect runtimes ```suggestion * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate topic * names when deployed to older Connect runtimes that do not support this method. ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. + * + * @return the topic corresponding to the Kafka record before any transformations were applied Review Comment: Nit: can simplify language (don't love "corresponding" if we can find something less wordy) ```suggestion * @return the topic for this record before any transformations were applied ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); Review Comment: I'm torn. It doesn't seem especially safe to provide this as an example of how to handle older Connect runtimes. But it also seems a little ham-fisted to recommend that developers throw an exception with a message like "This connector is incompatible with this version of Kafka Connect". What do you think about adding a warning log message to the `catch` body stating that, with the current version of the Connect runtime, this connector will be incompatible with TPO-mutating SMTs? ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. + * + * @return the topic corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public String originalTopic() { + return originalTopic; + } + + /** + * Get the original topic partition for this sink record, corresponding to the topic partition of the Kafka record + * before any {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should + * be used by sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #kafkaPartition()}, in order to be compatible + * with transformations that mutate the topic partition value. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalKafkaPartition; + * try { + * originalKafkaPartition = record.originalKafkaPartition(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalKafkaPartition = record.kafkaPartition(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * partition when deployed to older Connect runtimes. + * + * @return the topic partition corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public Integer originalKafkaPartition() { + return originalKafkaPartition; + } + + /** + * Get the original offset for this sink record, corresponding to the offset of the Kafka record before any Review Comment: I think the remarks for `originalTopic` can all be applied here as well; not copying them over to reduce noise. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. + * + * @return the topic corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public String originalTopic() { + return originalTopic; + } + + /** + * Get the original topic partition for this sink record, corresponding to the topic partition of the Kafka record Review Comment: I think the remarks for `originalTopic` can all be applied here as well; not copying them over to reduce noise. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { - return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); + return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, + originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: Nit: for consistency's sake, can we change this to use fields directly instead of access method? ```suggestion return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers, originalTopic, originalKafkaPartition, originalKafkaOffset); ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -79,17 +195,21 @@ public boolean equals(Object o) { SinkRecord that = (SinkRecord) o; - if (kafkaOffset != that.kafkaOffset) - return false; - - return timestampType == that.timestampType; + return kafkaOffset == that.kafkaOffset && + timestampType == that.timestampType && + Objects.equals(originalTopic, that.originalTopic) && + Objects.equals(originalKafkaPartition, that.originalKafkaPartition) + && originalKafkaOffset == that.originalKafkaOffset; Review Comment: Nit: consistent formatting ```suggestion Objects.equals(originalKafkaPartition, that.originalKafkaPartition) && originalKafkaOffset == that.originalKafkaOffset; ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java: ########## @@ -118,11 +122,16 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} * are safe to commit. * - * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}}, - * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s - * passed to {@link #put}. + * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}, provided for + * convenience but could also be determined by tracking all offsets included in the + * {@link SinkRecord}s passed to {@link #put}. Note that the topic, partition and offset + * here correspond to the original Kafka topic partition and offset, before any {@link Transformation}s + * have been applied. These can be tracked by the task through the {@link SinkRecord#originalTopic()}, + * {@link SinkRecord#originalKafkaPartition()} and {@link SinkRecord#originalKafkaOffset()} methods. * - * @return an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit. + * @return an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are + * safe to commit. Note that the returned topic-partition to offsets map should also use the original Kafka Review Comment: Nit: ```suggestion * safe to commit. Note that the returned topic-partition to offsets map should use the original Kafka ``` ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -44,9 +51,18 @@ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Sch public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType, Iterable<Header> headers) { + this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers, topic, partition, kafkaOffset); + } + + public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Review Comment: Can we add a Javadoc to this constructor stating that it's intended for use by the Connect runtime only, and that developers should not use it directly in plugin code? (I'm qualifying things that way because it may still be useful for developers to use this constructor when writing unit tests for their connector/transformation/etc.) ########## connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java: ########## @@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() { Header header = record.headers().lastWithName("intHeader"); assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value())); } -} \ No newline at end of file + + @Test + public void shouldRetainOriginalTopicPartition() { Review Comment: Can we update the declaration for `record` in the `beforeEach` method to explicitly use the constructor that takes in the original TPO? Right now we're implicitly relying on that behavior with this test. If we want to be paranoid (we don't have to but it's up to you), we can also add a test case for the older constructors that verifies that they provide the possibly-transformed TPO as the original TPO as well. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -79,17 +195,21 @@ public boolean equals(Object o) { SinkRecord that = (SinkRecord) o; - if (kafkaOffset != that.kafkaOffset) - return false; - - return timestampType == that.timestampType; + return kafkaOffset == that.kafkaOffset && + timestampType == that.timestampType && + Objects.equals(originalTopic, that.originalTopic) && + Objects.equals(originalKafkaPartition, that.originalKafkaPartition) + && originalKafkaOffset == that.originalKafkaOffset; Review Comment: Thanks for the detailed analysis! Anticipating some possible use cases that may be impacted by this, I've come up with: 1. Equality testing in unit tests 2. Storing sink records in data structures (e.g., elements in a `HashSet` or keys in a `HashMap`) which contain elements that are unique according to their `equals` method With 1, this change is unlikely to be controversial. With 2, things are a little trickier, but ultimately I think it's best to proceed with this change. There's enough granularity in the existing `equals` method that I can't anticipate any realistic cases where that method would previously return `true` but would now return `false`. For example, if a connector is tracking records that were redelivered to it via `Sink::put` after throwing an exception in `SinkTask::preCommit`, this change won't affect that case since the re-delivered records would have the same original TPO. Also, FWIW, there's precedent here with when we added headers to Connect in [KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect); there was no discussion on [the PR](https://github.com/apache/kafka/pull/4319) regarding the impact that changes to these methods may have on compatibility. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -79,17 +195,21 @@ public boolean equals(Object o) { SinkRecord that = (SinkRecord) o; - if (kafkaOffset != that.kafkaOffset) - return false; - - return timestampType == that.timestampType; + return kafkaOffset == that.kafkaOffset && + timestampType == that.timestampType && + Objects.equals(originalTopic, that.originalTopic) && + Objects.equals(originalKafkaPartition, that.originalKafkaPartition) + && originalKafkaOffset == that.originalKafkaOffset; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + Long.hashCode(kafkaOffset); result = 31 * result + timestampType.hashCode(); + result = 31 * result + originalTopic.hashCode(); + result = 31 * result + originalKafkaPartition.hashCode(); + result = 31 * result + Long.hashCode(originalKafkaOffset); Review Comment: LGTM 👍 ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -79,17 +195,21 @@ public boolean equals(Object o) { SinkRecord that = (SinkRecord) o; - if (kafkaOffset != that.kafkaOffset) - return false; - - return timestampType == that.timestampType; + return kafkaOffset == that.kafkaOffset && + timestampType == that.timestampType && + Objects.equals(originalTopic, that.originalTopic) && + Objects.equals(originalKafkaPartition, that.originalKafkaPartition) + && originalKafkaOffset == that.originalKafkaOffset; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + Long.hashCode(kafkaOffset); result = 31 * result + timestampType.hashCode(); + result = 31 * result + originalTopic.hashCode(); + result = 31 * result + originalKafkaPartition.hashCode(); + result = 31 * result + Long.hashCode(originalKafkaOffset); Review Comment: LGTM 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org