gharris1727 commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1265731323
########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } + /** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * name when deployed to older Connect runtimes. + * + * @return the topic corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public String originalTopic() { + return originalTopic; + } + + /** + * Get the original topic partition for this sink record, corresponding to the topic partition of the Kafka record + * before any {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should + * be used by sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #kafkaPartition()}, in order to be compatible + * with transformations that mutate the topic partition value. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalKafkaPartition; + * try { + * originalKafkaPartition = record.originalKafkaPartition(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalKafkaPartition = record.kafkaPartition(); + * } + * } + * </pre> + * <p> + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate the topic + * partition when deployed to older Connect runtimes. + * + * @return the topic partition corresponding to the Kafka record before any transformations were applied + * + * @since 3.6 + */ + public Integer originalKafkaPartition() { + return originalKafkaPartition; + } + + /** + * Get the original offset for this sink record, corresponding to the offset of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #kafkaOffset()}, in order to be + * compatible with transformations that mutate the offset value. + * <p> + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * <pre>{@code + * String originalKafkaOffset; + * try { + * originalKafkaOffset = record.originalKafkaOffset(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { Review Comment: I think the NoClassDefFoundError was specific to the error reporter, since the method returned a type which didn't always exist. Here, where the return types are native Java types, I think the NoSuchMethodError is the only error possible here. Can you verify this? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java: ########## @@ -34,27 +31,12 @@ public class InternalSinkRecord extends SinkRecord { public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) { super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), - record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(), - record.timestampType(), record.headers()); + record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(), + record.timestampType(), record.headers(), originalRecord.topic(), originalRecord.partition(), + originalRecord.offset()); this.originalRecord = originalRecord; } - protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic, - int partition, Schema keySchema, Object key, Schema valueSchema, - Object value, long kafkaOffset, Long timestamp, - TimestampType timestampType, Iterable<Header> headers) { - super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers); - this.originalRecord = originalRecord; - } - - @Override - public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Review Comment: This override was added due to the following comment: https://github.com/apache/kafka/pull/8720/files#r431367467 But I don't understand the necessity of having the override. Is there a reason you needed to remove this override? Without understanding the need to include or exclude it, i'd err on keeping what was already there. As far as I can tell, the InternalSinkRecord is only passed to the connector. I am not aware of any use-cases for calling newRecord in the connector, but it is of course possible. ########## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ########## @@ -79,17 +195,21 @@ public boolean equals(Object o) { SinkRecord that = (SinkRecord) o; - if (kafkaOffset != that.kafkaOffset) - return false; - - return timestampType == that.timestampType; + return kafkaOffset == that.kafkaOffset && + timestampType == that.timestampType && + Objects.equals(originalTopic, that.originalTopic) && + Objects.equals(originalKafkaPartition, that.originalKafkaPartition) + && originalKafkaOffset == that.originalKafkaOffset; Review Comment: The connector can receive two SinkRecords which may pass equality if they have the same T/P/O, and the same contents, header, etc. Right now this can happen when an SMT causes a T/P/O collision, and if a record is re-delivered to the connector. After this change, only a re-delivery can cause this equality check to fire. I don't think it would be reasonable for someone to rely on this to detect T/P/O collisions, because it is inherently unreliable due to checking equality of the contents of the record. I think the semantics for only being equal after a re-delivery is much better. We don't seem to rely on this equals method in the runtime, so this should only affect connector implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org