yashmayya commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1266406455
##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
return timestampType;
}
+ /**
+ * Get the original topic for this sink record, corresponding to the topic
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalTopic = record.topic();
Review Comment:
> But it also seems a little ham-fisted to recommend that developers throw
an exception with a message like "This connector is incompatible with this
version of Kafka Connect".
Yeah, I really wanted to avoid this because connectors don't have a clean
way to find out if they're configured with T/P/O mutating transformations and
it doesn't make sense to be completely incompatible with older runtimes. That's
why I'd included the following note after the code block:
```
Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate topic
names when deployed to older Connect runtimes that do not support this
method.
```
I think adding a warning log is a good additional step.
##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer
kafkaPartition, Schema keySche
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema
keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
- return new SinkRecord(topic, kafkaPartition, keySchema, key,
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+ return new SinkRecord(topic, kafkaPartition, keySchema, key,
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+ originalTopic(), originalKafkaPartition(),
originalKafkaOffset());
Review Comment:
I figured this was intentional to differentiate between the use of instance
fields and method arguments and chose to stick with the convention used here.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java:
##########
@@ -34,27 +31,12 @@ public class InternalSinkRecord extends SinkRecord {
public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord,
SinkRecord record) {
super(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(),
- record.valueSchema(), record.value(), record.kafkaOffset(),
record.timestamp(),
- record.timestampType(), record.headers());
+ record.valueSchema(), record.value(), record.kafkaOffset(),
record.timestamp(),
+ record.timestampType(), record.headers(),
originalRecord.topic(), originalRecord.partition(),
+ originalRecord.offset());
this.originalRecord = originalRecord;
}
- protected InternalSinkRecord(ConsumerRecord<byte[], byte[]>
originalRecord, String topic,
- int partition, Schema keySchema, Object key,
Schema valueSchema,
- Object value, long kafkaOffset, Long
timestamp,
- TimestampType timestampType, Iterable<Header>
headers) {
- super(topic, partition, keySchema, key, valueSchema, value,
kafkaOffset, timestamp, timestampType, headers);
- this.originalRecord = originalRecord;
- }
-
- @Override
- public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema
keySchema, Object key,
Review Comment:
> Is there a reason you needed to remove this override
Hm no, I removed it because I came to the same conclusion you did about its
usefulness.
> Without understanding the need to include or exclude it, i'd err on
keeping what was already there.
Fair point, I've added it back with the necessary modifications (and a small
unit test similar to the one for `SinkRecord`).
This bit is also relevant and while it still doesn't seem like a realistic
potential case, I agree that it's probably better to not change this for now
https://github.com/apache/kafka/blob/0c6b1a4e9a94cc8d7461b78e5bb0b583dc0307d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L77-L80
##########
connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java:
##########
@@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() {
Header header = record.headers().lastWithName("intHeader");
assertEquals(100, (int) Values.convertToInteger(header.schema(),
header.value()));
}
-}
\ No newline at end of file
+
+ @Test
+ public void shouldRetainOriginalTopicPartition() {
Review Comment:
Sounds good, no harm in having the additional test as well 👍
##########
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##########
@@ -57,6 +73,105 @@ public TimestampType timestampType() {
return timestampType;
}
+ /**
+ * Get the original topic for this sink record, corresponding to the topic
of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link
#topic()}, in order to be compatible with
+ * transformations that mutate the topic name.
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalTopic = record.topic();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate the topic
+ * name when deployed to older Connect runtimes.
+ *
+ * @return the topic corresponding to the Kafka record before any
transformations were applied
+ *
+ * @since 3.6
+ */
+ public String originalTopic() {
+ return originalTopic;
+ }
+
+ /**
+ * Get the original topic partition for this sink record, corresponding to
the topic partition of the Kafka record
+ * before any {@link org.apache.kafka.connect.transforms.Transformation
Transformation}s were applied. This should
+ * be used by sink tasks for any internal offset tracking purposes (that
are reported to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link
#kafkaPartition()}, in order to be compatible
+ * with transformations that mutate the topic partition value.
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalKafkaPartition;
+ * try {
+ * originalKafkaPartition = record.originalKafkaPartition();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * originalKafkaPartition = record.kafkaPartition();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate the topic
+ * partition when deployed to older Connect runtimes.
+ *
+ * @return the topic partition corresponding to the Kafka record before
any transformations were applied
+ *
+ * @since 3.6
+ */
+ public Integer originalKafkaPartition() {
+ return originalKafkaPartition;
+ }
+
+ /**
+ * Get the original offset for this sink record, corresponding to the
offset of the Kafka record before any
+ * {@link org.apache.kafka.connect.transforms.Transformation
Transformation}s were applied. This should be used by
+ * sink tasks for any internal offset tracking purposes (that are reported
to the framework via
+ * {@link SinkTask#preCommit(Map)} for instance) rather than {@link
#kafkaOffset()}, in order to be
+ * compatible with transformations that mutate the offset value.
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodException} or
+ * {@link NoClassDefFoundError} when the sink connector is deployed to
Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalKafkaOffset;
+ * try {
+ * originalKafkaOffset = record.originalKafkaOffset();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
Review Comment:
Ah good point, I actually used `SourceTaskContext::transactionContext` as
the reference here but that has the same issue with type existence.
> Here, where the return types are native Java types, I think the
NoSuchMethodError is the only error possible here. Can you verify this?
Yup, this is correct and I've updated the Javadocs for these methods, thanks!
##########
connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java:
##########
@@ -125,4 +125,19 @@ public void shouldModifyRecordHeader() {
Header header = record.headers().lastWithName("intHeader");
assertEquals(100, (int) Values.convertToInteger(header.schema(),
header.value()));
}
-}
\ No newline at end of file
+
+ @Test
+ public void shouldRetainOriginalTopicPartition() {
Review Comment:
Sounds good, no harm in having the additional test as well 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]