C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1265675099


##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.

Review Comment:
   On top of the `Transformation` link nit, we can simplify some of the 
language here:
   ```suggestion
        * Get the original topic for this sink record, before any
        * {@link Transformation transformations} were applied.
        * In order to be compatible with transformations that mutate topic 
names, this method should be used
        * by sink tasks instead of {@link #topic()} for any internal offset 
tracking purposes (for instance, reporting offsets to the Connect runtime via
        * {@link SinkTask#preCommit(Map)}).
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -20,18 +20,25 @@
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields. 
This information
- * should be used by the {@link SinkTask} to coordinate kafkaOffset commits.
+ * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the original Kafka record's
+ * topic, partition and offset (before any {@link Transformation}s have been 
applied) in addition to the standard fields.
+ * This information should be used by the {@link SinkTask} to coordinate 
offset commits.

Review Comment:
   Nit: formatting/readability
   ```suggestion
    * topic, partition and offset (before any {@link Transformation 
transformations}
    * have been applied) in addition to the standard fields. This information 
should be used by the {@link SinkTask} to coordinate
    * offset commits.
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by

Review Comment:
   Nit: formatting/readability
   ```suggestion
        * {@link org.apache.kafka.connect.transforms.Transformation 
transformations} were applied. This should be used by
   ```
   
   (this applies to a bunch of places in this PR; only noting it here to reduce 
noise)



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.

Review Comment:
   Nit: language tweak, clarification on unsupported Connect runtimes
   ```suggestion
        * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate topic
        * names when deployed to older Connect runtimes that do not support 
this method.
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.
+     *
+     * @return the topic corresponding to the Kafka record before any 
transformations were applied

Review Comment:
   Nit: can simplify language (don't love "corresponding" if we can find 
something less wordy)
   ```suggestion
        * @return the topic for this record before any transformations were 
applied
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();

Review Comment:
   I'm torn. It doesn't seem especially safe to provide this as an example of 
how to handle older Connect runtimes. But it also seems a little ham-fisted to 
recommend that developers throw an exception with a message like "This 
connector is incompatible with this version of Kafka Connect".
   
   What do you think about adding a warning log message to the `catch` body 
stating that, with the current version of the Connect runtime, this connector 
will be incompatible with TPO-mutating SMTs?



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.
+     *
+     * @return the topic corresponding to the Kafka record before any 
transformations were applied
+     *
+     * @since 3.6
+     */
+    public String originalTopic() {
+        return originalTopic;
+    }
+
+    /**
+     * Get the original topic partition for this sink record, corresponding to 
the topic partition of the Kafka record
+     * before any {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should
+     * be used by sink tasks for any internal offset tracking purposes (that 
are reported to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#kafkaPartition()}, in order to be compatible
+     * with transformations that mutate the topic partition value.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaPartition;
+     * try {
+     *     originalKafkaPartition = record.originalKafkaPartition();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalKafkaPartition = record.kafkaPartition();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * partition when deployed to older Connect runtimes.
+     *
+     * @return the topic partition corresponding to the Kafka record before 
any transformations were applied
+     *
+     * @since 3.6
+     */
+    public Integer originalKafkaPartition() {
+        return originalKafkaPartition;
+    }
+
+    /**
+     * Get the original offset for this sink record, corresponding to the 
offset of the Kafka record before any

Review Comment:
   I think the remarks for `originalTopic` can all be applied here as well; not 
copying them over to reduce noise.



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.
+     *
+     * @return the topic corresponding to the Kafka record before any 
transformations were applied
+     *
+     * @since 3.6
+     */
+    public String originalTopic() {
+        return originalTopic;
+    }
+
+    /**
+     * Get the original topic partition for this sink record, corresponding to 
the topic partition of the Kafka record

Review Comment:
   I think the remarks for `originalTopic` can all be applied here as well; not 
copying them over to reduce noise.



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
     @Override
     public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
                                 Long timestamp, Iterable<Header> headers) {
-        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+                originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   Nit: for consistency's sake, can we change this to use fields directly 
instead of access method?
   ```suggestion
           return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset, timestamp, timestampType, headers,
                   originalTopic, originalKafkaPartition, originalKafkaOffset);
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -79,17 +195,21 @@ public boolean equals(Object o) {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition)
+                && originalKafkaOffset == that.originalKafkaOffset;

Review Comment:
   Nit: consistent formatting
   ```suggestion
                   Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition) &&
                   originalKafkaOffset == that.originalKafkaOffset;
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java:
##########
@@ -118,11 +122,16 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> 
currentOffsets) {
      * The default implementation simply invokes {@link #flush(Map)} and is 
thus able to assume all {@code currentOffsets}
      * are safe to commit.
      *
-     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}},
-     *                       provided for convenience but could also be 
determined by tracking all offsets included in the {@link SinkRecord}s
-     *                       passed to {@link #put}.
+     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}, provided for
+     *                       convenience but could also be determined by 
tracking all offsets included in the
+     *                       {@link SinkRecord}s passed to {@link #put}. Note 
that the topic, partition and offset
+     *                       here correspond to the original Kafka topic 
partition and offset, before any {@link Transformation}s
+     *                       have been applied. These can be tracked by the 
task through the {@link SinkRecord#originalTopic()},
+     *                       {@link SinkRecord#originalKafkaPartition()} and 
{@link SinkRecord#originalKafkaOffset()} methods.
      *
-     * @return an empty map if Connect-managed offset commit is not desired, 
otherwise a map of offsets by topic-partition that are safe to commit.
+     * @return an empty map if Connect-managed offset commit is not desired, 
otherwise a map of offsets by topic-partition that are
+     *         safe to commit. Note that the returned topic-partition to 
offsets map should also use the original Kafka

Review Comment:
   Nit:
   ```suggestion
        *         safe to commit. Note that the returned topic-partition to 
offsets map should use the original Kafka
   ```



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -44,9 +51,18 @@ public SinkRecord(String topic, int partition, Schema 
keySchema, Object key, Sch
 
     public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset,
                       Long timestamp, TimestampType timestampType, 
Iterable<Header> headers) {
+        this(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers, topic, partition, kafkaOffset);
+    }
+
+    public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset,

Review Comment:
   Can we add a Javadoc to this constructor stating that it's intended for use 
by the Connect runtime only, and that developers should not use it directly in 
plugin code?
   
   (I'm qualifying things that way because it may still be useful for 
developers to use this constructor when writing unit tests for their 
connector/transformation/etc.)



##########
connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java:
##########
@@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() {
         Header header = record.headers().lastWithName("intHeader");
         assertEquals(100, (int) Values.convertToInteger(header.schema(), 
header.value()));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void shouldRetainOriginalTopicPartition() {

Review Comment:
   Can we update the declaration for `record` in the `beforeEach` method to 
explicitly use the constructor that takes in the original TPO? Right now we're 
implicitly relying on that behavior with this test.
   
   If we want to be paranoid (we don't have to but it's up to you), we can also 
add a test case for the older constructors that verifies that they provide the 
possibly-transformed TPO as the original TPO as well.



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -79,17 +195,21 @@ public boolean equals(Object o) {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition)
+                && originalKafkaOffset == that.originalKafkaOffset;

Review Comment:
   Thanks for the detailed analysis!
   
   Anticipating some possible use cases that may be impacted by this, I've come 
up with:
   
   1. Equality testing in unit tests
   2. Storing sink records in data structures (e.g., elements in a `HashSet` or 
keys in a `HashMap`) which contain elements that are unique according to their 
`equals` method
   
   With 1, this change is unlikely to be controversial.
   
   With 2, things are a little trickier, but ultimately I think it's best to 
proceed with this change. There's enough granularity in the existing `equals` 
method that I can't anticipate any realistic cases where that method would 
previously return `true` but would now return `false`. For example, if a 
connector is tracking records that were redelivered to it via `Sink::put` after 
throwing an exception in `SinkTask::preCommit`, this change won't affect that 
case since the re-delivered records would have the same original TPO.
   
   Also, FWIW, there's precedent here with when we added headers to Connect in 
[KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect);
 there was no discussion on [the PR](https://github.com/apache/kafka/pull/4319) 
regarding the impact that changes to these methods may have on compatibility.



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -79,17 +195,21 @@ public boolean equals(Object o) {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition)
+                && originalKafkaOffset == that.originalKafkaOffset;
     }
 
     @Override
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + Long.hashCode(kafkaOffset);
         result = 31 * result + timestampType.hashCode();
+        result = 31 * result + originalTopic.hashCode();
+        result = 31 * result + originalKafkaPartition.hashCode();
+        result = 31 * result + Long.hashCode(originalKafkaOffset);

Review Comment:
   LGTM 👍



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -79,17 +195,21 @@ public boolean equals(Object o) {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition)
+                && originalKafkaOffset == that.originalKafkaOffset;
     }
 
     @Override
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + Long.hashCode(kafkaOffset);
         result = 31 * result + timestampType.hashCode();
+        result = 31 * result + originalTopic.hashCode();
+        result = 31 * result + originalKafkaPartition.hashCode();
+        result = 31 * result + Long.hashCode(originalKafkaOffset);

Review Comment:
   LGTM 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to