[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1267245540 ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable headers) { -return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); +return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, +originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: A separate ticket/PR sounds great, thanks! Feel free to ping me on the PR if you implement the fx. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1266917499 ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable headers) { -return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); +return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, +originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: There's a bug in the [third example](https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L54-L55) The `newRecord` method accepts an `Iterable headers` parameter, but then when constructing the `InternalSinkRecord` that's used as the return value, it passes in `headers()` instead of `headers` as the parameter for the new record's headers, which causes the headers that were supplied to the `newRecord` method to be ignored. It'd be great if we could fix that bug (and possibly add testing coverage for it). And IMO this is a decent reason to abandon using methods to differentiate between fields and parameters, though as long as there aren't any bugs in this PR I could live with it if you feel strongly that it's still worth making that distinction. ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable headers) { -return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); +return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, +originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: There's a bug in the [third example](https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L54-L55) The `newRecord` method accepts an `Iterable headers` parameter, but then when constructing the `InternalSinkRecord` that's used as the return value, it passes in `headers()` instead of `headers` as the parameter for the new record's headers, which causes the headers that were supplied to the `newRecord` method to be ignored. It'd be great if we could fix that bug (and possibly add testing coverage for it). And IMO this is a decent reason to abandon using methods to differentiate between fields and parameters, though as long as there aren't any bugs in this PR I could live with it if you feel strongly that it's still worth making that distinction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1266903930 ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } +/** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * {@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); Review Comment: LGTM, thanks! ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } +/** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * {@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); Review Comment: LGTM, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1265675099 ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } +/** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. Review Comment: On top of the `Transformation` link nit, we can simplify some of the language here: ```suggestion * Get the original topic for this sink record, before any * {@link Transformation transformations} were applied. * In order to be compatible with transformations that mutate topic names, this method should be used * by sink tasks instead of {@link #topic()} for any internal offset tracking purposes (for instance, reporting offsets to the Connect runtime via * {@link SinkTask#preCommit(Map)}). ``` ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -20,18 +20,25 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; +import java.util.Objects; /** - * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of - * the record in the Kafka topic-partition in addition to the standard fields. This information - * should be used by the {@link SinkTask} to coordinate kafkaOffset commits. + * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the original Kafka record's + * topic, partition and offset (before any {@link Transformation}s have been applied) in addition to the standard fields. + * This information should be used by the {@link SinkTask} to coordinate offset commits. Review Comment: Nit: formatting/readability ```suggestion * topic, partition and offset (before any {@link Transformation transformations} * have been applied) in addition to the standard fields. This information should be used by the {@link SinkTask} to coordinate * offset commits. ``` ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } +/** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by Review Comment: Nit: formatting/readability ```suggestion * {@link org.apache.kafka.connect.transforms.Transformation transformations} were applied. This should be used by ``` (this applies to a bunch of places in this PR; only noting it here to reduce noise) ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -57,6 +73,105 @@ public TimestampType timestampType() { return timestampType; } +/** + * Get the original topic for this sink record, corresponding to the topic of the Kafka record before any + * {@link org.apache.kafka.connect.transforms.Transformation Transformation}s were applied. This should be used by + * sink tasks for any internal offset tracking purposes (that are reported to the framework via + * {@link SinkTask#preCommit(Map)} for instance) rather than {@link #topic()}, in order to be compatible with + * transformations that mutate the topic name. + * + * This method was added in Apache Kafka 3.6. Sink connectors that use this method but want to maintain backward + * compatibility in order to be able to be deployed on older Connect runtimes should guard the call to this method + * with a try-catch block, since calling this method will result in a {@link NoSuchMethodException} or + * {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes older than Kafka 3.6. + * For example: + * {@code + * String originalTopic; + * try { + * originalTopic = record.originalTopic(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * originalTopic = record.topic(); + * } + * } + * + * + * Note that sink connectors that do their own offset tracking will be incompatible with SMTs that mutate