C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1266903930


##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();

Review Comment:
   👍 LGTM, thanks!



##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+     * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+     * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+     * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+     * transformations that mutate the topic name.
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+     * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *     originalTopic = record.topic();

Review Comment:
   👍 LGTM, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to