gharris1727 commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1265731323


##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.
+     *
+     * @return the topic corresponding to the Kafka record before any 
transformations were applied
+     *
+     * @since 3.6
+     */
+    public String originalTopic() {
+        return originalTopic;
+    }
+
+    /**
+     * Get the original topic partition for this sink record, corresponding to 
the topic partition of the Kafka record
+     * before any {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should
+     * be used by sink tasks for any internal offset tracking purposes (that 
are reported to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#kafkaPartition()}, in order to be compatible
+     * with transformations that mutate the topic partition value.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaPartition;
+     * try {
+     *     originalKafkaPartition = record.originalKafkaPartition();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalKafkaPartition = record.kafkaPartition();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * partition when deployed to older Connect runtimes.
+     *
+     * @return the topic partition corresponding to the Kafka record before 
any transformations were applied
+     *
+     * @since 3.6
+     */
+    public Integer originalKafkaPartition() {
+        return originalKafkaPartition;
+    }
+
+    /**
+     * Get the original offset for this sink record, corresponding to the 
offset of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#kafkaOffset()}, in order to be
+     * compatible with transformations that mutate the offset value.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaOffset;
+     * try {
+     *     originalKafkaOffset = record.originalKafkaOffset();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {

Review Comment:
   I think the NoClassDefFoundError was specific to the error reporter, since 
the method returned a type which didn't always exist. Here, where the return 
types are native Java types, I think the NoSuchMethodError is the only error 
possible here. Can you verify this?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java:
##########
@@ -34,27 +31,12 @@ public class InternalSinkRecord extends SinkRecord {
 
     public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, 
SinkRecord record) {
         super(record.topic(), record.kafkaPartition(), record.keySchema(), 
record.key(),
-            record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
-            record.timestampType(), record.headers());
+                record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
+                record.timestampType(), record.headers(), 
originalRecord.topic(), originalRecord.partition(),
+                originalRecord.offset());
         this.originalRecord = originalRecord;
     }
 
-    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> 
originalRecord, String topic,
-                                 int partition, Schema keySchema, Object key, 
Schema valueSchema,
-                                 Object value, long kafkaOffset, Long 
timestamp,
-                                 TimestampType timestampType, Iterable<Header> 
headers) {
-        super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers);
-        this.originalRecord = originalRecord;
-    }
-
-    @Override
-    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key,

Review Comment:
   This override was added due to the following comment: 
https://github.com/apache/kafka/pull/8720/files#r431367467
   
   But I don't understand the necessity of having the override. Is there a 
reason you needed to remove this override? Without understanding the need to 
include or exclude it, i'd err on keeping what was already there.
   As far as I can tell, the InternalSinkRecord is only passed to the 
connector. I am not aware of any use-cases for calling newRecord in the 
connector, but it is of course possible.
   
   



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -79,17 +195,21 @@ public boolean equals(Object o) {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition)
+                && originalKafkaOffset == that.originalKafkaOffset;

Review Comment:
   The connector can receive two SinkRecords which may pass equality if they 
have the same T/P/O, and the same contents, header, etc. Right now this can 
happen when an SMT causes a T/P/O collision, and if a record is re-delivered to 
the connector. After this change, only a re-delivery can cause this equality 
check to fire.
   
   I don't think it would be reasonable for someone to rely on this to detect 
T/P/O collisions, because it is inherently unreliable due to checking equality 
of the contents of the record. I think the semantics for only being equal after 
a re-delivery is much better.
   
   We don't seem to rely on this equals method in the runtime, so this should 
only affect connector implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to