yashmayya commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1266406455


##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();

Review Comment:
   > But it also seems a little ham-fisted to recommend that developers throw 
an exception with a message like "This connector is incompatible with this 
version of Kafka Connect".
   
   Yeah, I really wanted to avoid this because connectors don't have a clean 
way to find out if they're configured with T/P/O mutating transformations and 
it doesn't make sense to be completely incompatible with older runtimes. That's 
why I'd included the following note after the code block:
   ```
   Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate topic
   names when deployed to older Connect runtimes that do not support this 
method.
   ```
   
   I think adding a warning log is a good additional step.



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
     @Override
     public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
                                 Long timestamp, Iterable<Header> headers) {
-        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+                originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   I figured this was intentional to differentiate between the use of instance 
fields and method arguments and chose to stick with the convention used here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java:
##########
@@ -34,27 +31,12 @@ public class InternalSinkRecord extends SinkRecord {
 
     public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, 
SinkRecord record) {
         super(record.topic(), record.kafkaPartition(), record.keySchema(), 
record.key(),
-            record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
-            record.timestampType(), record.headers());
+                record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
+                record.timestampType(), record.headers(), 
originalRecord.topic(), originalRecord.partition(),
+                originalRecord.offset());
         this.originalRecord = originalRecord;
     }
 
-    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> 
originalRecord, String topic,
-                                 int partition, Schema keySchema, Object key, 
Schema valueSchema,
-                                 Object value, long kafkaOffset, Long 
timestamp,
-                                 TimestampType timestampType, Iterable<Header> 
headers) {
-        super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers);
-        this.originalRecord = originalRecord;
-    }
-
-    @Override
-    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key,

Review Comment:
   > Is there a reason you needed to remove this override
   
   Hm no, I removed it because I came to the same conclusion you did about its 
usefulness.
   
   > Without understanding the need to include or exclude it, i'd err on 
keeping what was already there.
   
   Fair point, I've added it back with the necessary modifications (and a small 
unit test similar to the one for `SinkRecord`).
   
   This bit is also relevant and while it still doesn't seem like a realistic 
potential case, I agree that it's probably better to not change this for now
   
https://github.com/apache/kafka/blob/0c6b1a4e9a94cc8d7461b78e5bb0b583dc0307d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L77-L80



##########
connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java:
##########
@@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() {
         Header header = record.headers().lastWithName("intHeader");
         assertEquals(100, (int) Values.convertToInteger(header.schema(), 
header.value()));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void shouldRetainOriginalTopicPartition() {

Review Comment:
   Sounds good, no harm in having the additional test as well 👍 



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * name when deployed to older Connect runtimes.
+     *
+     * @return the topic corresponding to the Kafka record before any 
transformations were applied
+     *
+     * @since 3.6
+     */
+    public String originalTopic() {
+        return originalTopic;
+    }
+
+    /**
+     * Get the original topic partition for this sink record, corresponding to 
the topic partition of the Kafka record
+     * before any {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should
+     * be used by sink tasks for any internal offset tracking purposes (that 
are reported to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#kafkaPartition()}, in order to be compatible
+     * with transformations that mutate the topic partition value.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaPartition;
+     * try {
+     *     originalKafkaPartition = record.originalKafkaPartition();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalKafkaPartition = record.kafkaPartition();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate the topic
+     * partition when deployed to older Connect runtimes.
+     *
+     * @return the topic partition corresponding to the Kafka record before 
any transformations were applied
+     *
+     * @since 3.6
+     */
+    public Integer originalKafkaPartition() {
+        return originalKafkaPartition;
+    }
+
+    /**
+     * Get the original offset for this sink record, corresponding to the 
offset of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#kafkaOffset()}, in order to be
+     * compatible with transformations that mutate the offset value.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaOffset;
+     * try {
+     *     originalKafkaOffset = record.originalKafkaOffset();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {

Review Comment:
   Ah good point, I actually used `SourceTaskContext::transactionContext` as 
the reference here but that has the same issue with type existence.
   
   > Here, where the return types are native Java types, I think the 
NoSuchMethodError is the only error possible here. Can you verify this?
   
   Yup, this is correct and I've updated the Javadocs for these methods, thanks!



##########
connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java:
##########
@@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() {
         Header header = record.headers().lastWithName("intHeader");
         assertEquals(100, (int) Values.convertToInteger(header.schema(), 
header.value()));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void shouldRetainOriginalTopicPartition() {

Review Comment:
   Sounds good, no harm in having the additional test as well 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to