[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-18 Thread via GitHub


C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1267245540


##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
 @Override
 public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
 Long timestamp, Iterable headers) {
-return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   A separate ticket/PR sounds great, thanks! Feel free to ping me on the PR if 
you implement the fx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-18 Thread via GitHub


C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1266917499


##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
 @Override
 public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
 Long timestamp, Iterable headers) {
-return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   There's a bug in the [third 
example](https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L54-L55)
 
   
   The `newRecord` method accepts an `Iterable headers` parameter, but 
then when constructing the `InternalSinkRecord` that's used as the return 
value, it passes in `headers()` instead of `headers` as the parameter for the 
new record's headers, which causes the headers that were supplied to the 
`newRecord` method to be ignored.
   
   It'd be great if we could fix that bug (and possibly add testing coverage 
for it). And IMO this is a decent reason to abandon using methods to 
differentiate between fields and parameters, though as long as there aren't any 
bugs in this PR I could live with it if you feel strongly that it's still worth 
making that distinction.



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
 @Override
 public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
 Long timestamp, Iterable headers) {
-return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   There's a bug in the [third 
example](https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L54-L55)
 
   
   The `newRecord` method accepts an `Iterable headers` parameter, but 
then when constructing the `InternalSinkRecord` that's used as the return 
value, it passes in `headers()` instead of `headers` as the parameter for the 
new record's headers, which causes the headers that were supplied to the 
`newRecord` method to be ignored.
   
   It'd be great if we could fix that bug (and possibly add testing coverage 
for it). And IMO this is a decent reason to abandon using methods to 
differentiate between fields and parameters, though as long as there aren't any 
bugs in this PR I could live with it if you feel strongly that it's still worth 
making that distinction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-18 Thread via GitHub


C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1266903930


##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
 return timestampType;
 }
 
+/**
+ * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.
+ * 
+ * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * {@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalTopic = record.topic();

Review Comment:
    LGTM, thanks!



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
 return timestampType;
 }
 
+/**
+ * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.
+ * 
+ * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * {@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalTopic = record.topic();

Review Comment:
    LGTM, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-17 Thread via GitHub


C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1265675099


##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
 return timestampType;
 }
 
+/**
+ * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.

Review Comment:
   On top of the `Transformation` link nit, we can simplify some of the 
language here:
   ```suggestion
* Get the original topic for this sink record, before any
* {@link Transformation transformations} were applied.
* In order to be compatible with transformations that mutate topic 
names, this method should be used
* by sink tasks instead of {@link #topic()} for any internal offset 
tracking purposes (for instance, reporting offsets to the Connect runtime via
* {@link SinkTask#preCommit(Map)}).
   ```



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -20,18 +20,25 @@
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields. 
This information
- * should be used by the {@link SinkTask} to coordinate kafkaOffset commits.
+ * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the original Kafka record's
+ * topic, partition and offset (before any {@link Transformation}s have been 
applied) in addition to the standard fields.
+ * This information should be used by the {@link SinkTask} to coordinate 
offset commits.

Review Comment:
   Nit: formatting/readability
   ```suggestion
* topic, partition and offset (before any {@link Transformation 
transformations}
* have been applied) in addition to the standard fields. This information 
should be used by the {@link SinkTask} to coordinate
* offset commits.
   ```



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
 return timestampType;
 }
 
+/**
+ * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by

Review Comment:
   Nit: formatting/readability
   ```suggestion
* {@link org.apache.kafka.connect.transforms.Transformation 
transformations} were applied. This should be used by
   ```
   
   (this applies to a bunch of places in this PR; only noting it here to reduce 
noise)



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
 return timestampType;
 }
 
+/**
+ * Get the original topic for this sink record, corresponding to the topic 
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation 
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported 
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link 
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.
+ * 
+ * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to 
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * {@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalTopic = record.topic();
+ * }
+ * }
+ * 
+ * 
+ * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate