yashmayya commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1266406455
########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); Review Comment: > But it also seems a little ham-fisted to recommend that developers throw an exception with a message like "This connector is incompatible with this version of Kafka Connect". Yeah, I really wanted to avoid this because connectors don't have a clean way to find out if they're configured with T/P/O mutating transformations and it doesn't make sense to be completely incompatible with older runtimes. That's why I'd included the following note after the code block: ``` Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate topic names when deployed to older Connect runtimes that do not support this method. ``` I think adding a warning log is a good additional step. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { - return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); + return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, + originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: I figured this was intentional to differentiate between the use of instance fields and method arguments and chose to stick with the convention used here. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java: ########## @@ -34,27 +31,12 @@ public class InternalSinkRecord extends SinkRecord { public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) { super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), - record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(), - record.timestampType(), record.headers()); + record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(), + record.timestampType(), record.headers(), originalRecord.topic(), originalRecord.partition(), + originalRecord.offset()); this.originalRecord = originalRecord; } - protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic, - int partition, Schema keySchema, Object key, Schema valueSchema, - Object value, long kafkaOffset, Long timestamp, - TimestampType timestampType, Iterable<Header> headers) { - super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers); - this.originalRecord = originalRecord; - } - - @Override - public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Review Comment: > Is there a reason you needed to remove this override Hm no, I removed it because I came to the same conclusion you did about its usefulness. > Without understanding the need to include or exclude it, i'd err on keeping what was already there. Fair point, I've added it back with the necessary modifications (and a small unit test similar to the one for `SinkRecord`). This bit is also relevant and while it still doesn't seem like a realistic potential case, I agree that it's probably better to not change this for now https://github.com/apache/kafka/blob/0c6b1a4e9a94cc8d7461b78e5bb0b583dc0307d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L77-L80 ########## connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java: ########## @@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() { Header header = record.headers().lastWithName("intHeader"); assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value())); } -} \ No newline at end of file + + @Test + public void shouldRetainOriginalTopicPartition() { Review Comment: Sounds good, no harm in having the additional test as well 👍 ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. + * + * @return the topic corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public String originalTopic() { + return originalTopic; + } + + /** + * Get the original topic partition for this sink record, corresponding to the topic partition of the Kafka record + * before any {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should + * be used by sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #kafkaPartition()}, in order to be compatible + * with transformations that mutate the topic partition value. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalKafkaPartition; + * try { + * originalKafkaPartition = record.originalKafkaPartition(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalKafkaPartition = record.kafkaPartition(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * partition when deployed to older Connect runtimes. + * + * @return the topic partition corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public Integer originalKafkaPartition() { + return originalKafkaPartition; + } + + /** + * Get the original offset for this sink record, corresponding to the offset of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #kafkaOffset()}, in order to be + * compatible with transformations that mutate the offset value. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalKafkaOffset; + * try { + * originalKafkaOffset = record.originalKafkaOffset(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { Review Comment: Ah good point, I actually used `SourceTaskContext::transactionContext` as the reference here but that has the same issue with type existence. > Here, where the return types are native Java types, I think the NoSuchMethodError is the only error possible here. Can you verify this? Yup, this is correct and I've updated the Javadocs for these methods, thanks! ########## connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java: ########## @@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() { Header header = record.headers().lastWithName("intHeader"); assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value())); } -} \ No newline at end of file + + @Test + public void shouldRetainOriginalTopicPartition() { Review Comment: Sounds good, no harm in having the additional test as well 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org