This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4daeb2714c2 KAFKA-13431 (KIP-793): Expose the original pre-transform 
topic partition and offset in sink records (#14024)
4daeb2714c2 is described below

commit 4daeb2714c29e4a34698292dcad5a64beed72e74
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Jul 21 17:06:01 2023 +0100

    KAFKA-13431 (KIP-793): Expose the original pre-transform topic partition 
and offset in sink records (#14024)
    
    Reviewers: Greg Harris <[email protected]>, Chris Egerton 
<[email protected]>
---
 checkstyle/import-control.xml                      |   1 +
 .../org/apache/kafka/connect/sink/SinkRecord.java  | 144 +++++++++++++++++++--
 .../org/apache/kafka/connect/sink/SinkTask.java    |  37 ++++--
 .../apache/kafka/connect/sink/SinkRecordTest.java  |  36 +++++-
 .../kafka/connect/runtime/InternalSinkRecord.java  |  10 +-
 .../connect/runtime/InternalSinkRecordTest.java    |  21 ++-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |  43 +++++-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   2 +-
 8 files changed, 263 insertions(+), 31 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 61e5070d0c1..9ae0cb12449 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -448,6 +448,7 @@
     <subpackage name="sink">
       <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.connect.connector" />
+      <allow pkg="org.apache.kafka.connect.transforms" />
       <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
 
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index 3e2f783d3cc..33e5d84389a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -20,11 +20,16 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields. 
This information
- * should be used by the {@link SinkTask} to coordinate kafkaOffset commits.
+ * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the original Kafka record's
+ * topic, partition and offset (before any {@link Transformation 
transformations} have been applied)
+ * in addition to the standard fields. This information should be used by the 
{@link SinkTask} to coordinate
+ * offset commits.
  * <p>
  * It also includes the {@link TimestampType}, which may be {@link 
TimestampType#NO_TIMESTAMP_TYPE}, and the relevant
  * timestamp, which may be {@code null}.
@@ -32,6 +37,9 @@ import org.apache.kafka.connect.header.Header;
 public class SinkRecord extends ConnectRecord<SinkRecord> {
     private final long kafkaOffset;
     private final TimestampType timestampType;
+    private final String originalTopic;
+    private final Integer originalKafkaPartition;
+    private final long originalKafkaOffset;
 
     public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset) {
         this(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, null, TimestampType.NO_TIMESTAMP_TYPE);
@@ -44,9 +52,22 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
 
     public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset,
                       Long timestamp, TimestampType timestampType, 
Iterable<Header> headers) {
+        this(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers, topic, partition, kafkaOffset);
+    }
+
+    /**
+     * This constructor is intended for use by the Connect runtime only and 
plugins (sink connectors or transformations)
+     * should not use this directly outside testing code.
+     */
+    public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset,
+                      Long timestamp, TimestampType timestampType, 
Iterable<Header> headers, String originalTopic,
+                      Integer originalKafkaPartition, long 
originalKafkaOffset) {
         super(topic, partition, keySchema, key, valueSchema, value, timestamp, 
headers);
         this.kafkaOffset = kafkaOffset;
         this.timestampType = timestampType;
+        this.originalTopic = originalTopic;
+        this.originalKafkaPartition = originalKafkaPartition;
+        this.originalKafkaOffset = originalKafkaOffset;
     }
 
     public long kafkaOffset() {
@@ -57,6 +78,105 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
         return timestampType;
     }
 
+    /**
+     * Get the original topic for this sink record, before any {@link 
Transformation transformations} were applied.
+     * In order to be compatible with transformations that mutate topic names, 
this method should be used
+     * by sink tasks instead of {@link #topic()} for any internal offset 
tracking purposes (for instance, reporting
+     * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodError} when the sink
+     * connector is deployed to Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalTopic;
+     * try {
+     *     originalTopic = record.originalTopic();
+     * } catch (NoSuchMethodError e) {
+     *     log.warn("This connector is not compatible with SMTs that mutate 
topic names, topic partitions or offset values on this version of Kafka 
Connect");
+     *     originalTopic = record.topic();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate topic
+     * names when deployed to older Connect runtimes that do not support this 
method.
+     *
+     * @return the topic for this record before any transformations were 
applied
+     *
+     * @since 3.6
+     */
+    public String originalTopic() {
+        return originalTopic;
+    }
+
+    /**
+     * Get the original topic partition for this sink record, before any 
{@link Transformation transformations} were applied.
+     * In order to be compatible with transformations that mutate topic 
partitions, this method should be used
+     * by sink tasks instead of {@link #kafkaPartition()} for any internal 
offset tracking purposes (for instance, reporting
+     * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodError} when the sink
+     * connector is deployed to Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaPartition;
+     * try {
+     *     originalKafkaPartition = record.originalKafkaPartition();
+     * } catch (NoSuchMethodError e) {
+     *     log.warn("This connector is not compatible with SMTs that mutate 
topic names, topic partitions or offset values on this version of Kafka 
Connect");
+     *     originalKafkaPartition = record.kafkaPartition();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate topic
+     * partitions when deployed to older Connect runtimes that do not support 
this method.
+     *
+     * @return the topic partition for this record before any transformations 
were applied
+     *
+     * @since 3.6
+     */
+    public Integer originalKafkaPartition() {
+        return originalKafkaPartition;
+    }
+
+    /**
+     * Get the original offset for this sink record, before any {@link 
Transformation transformations} were applied.
+     * In order to be compatible with transformations that mutate offset 
values, this method should be used
+     * by sink tasks instead of {@link #kafkaOffset()} for any internal offset 
tracking purposes (for instance, reporting
+     * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+     * <p>
+     * This method was added in Apache Kafka 3.6. Sink connectors that use 
this method but want to maintain backward
+     * compatibility in order to be able to be deployed on older Connect 
runtimes should guard the call to this method
+     * with a try-catch block, since calling this method will result in a 
{@link NoSuchMethodError} when the sink
+     * connector is deployed to Connect runtimes older than Kafka 3.6.
+     * For example:
+     * <pre>{@code
+     * String originalKafkaOffset;
+     * try {
+     *     originalKafkaOffset = record.originalKafkaOffset();
+     * } catch (NoSuchMethodError e) {
+     *     log.warn("This connector is not compatible with SMTs that mutate 
topic names, topic partitions or offset values on this version of Kafka 
Connect");
+     *     originalKafkaOffset = record.kafkaOffset();
+     * }
+     * }
+     * </pre>
+     * <p>
+     * Note that sink connectors that do their own offset tracking will be 
incompatible with SMTs that mutate offset
+     * values when deployed to older Connect runtimes that do not support this 
method.
+     *
+     * @return the offset for this record before any transformations were 
applied
+     *
+     * @since 3.6
+     */
+    public long originalKafkaOffset() {
+        return originalKafkaOffset;
+    }
+
     @Override
     public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
         return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, 
value, timestamp, headers().duplicate());
@@ -65,7 +185,8 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
     @Override
     public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
                                 Long timestamp, Iterable<Header> headers) {
-        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset, timestamp, timestampType, headers,
+                originalTopic, originalKafkaPartition, originalKafkaOffset);
     }
 
     @Override
@@ -79,10 +200,11 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
 
         SinkRecord that = (SinkRecord) o;
 
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return timestampType == that.timestampType;
+        return kafkaOffset == that.kafkaOffset &&
+                timestampType == that.timestampType &&
+                Objects.equals(originalTopic, that.originalTopic) &&
+                Objects.equals(originalKafkaPartition, 
that.originalKafkaPartition) &&
+                originalKafkaOffset == that.originalKafkaOffset;
     }
 
     @Override
@@ -90,6 +212,9 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
         int result = super.hashCode();
         result = 31 * result + Long.hashCode(kafkaOffset);
         result = 31 * result + timestampType.hashCode();
+        result = 31 * result + originalTopic.hashCode();
+        result = 31 * result + originalKafkaPartition.hashCode();
+        result = 31 * result + Long.hashCode(originalKafkaOffset);
         return result;
     }
 
@@ -98,6 +223,9 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
         return "SinkRecord{" +
                 "kafkaOffset=" + kafkaOffset +
                 ", timestampType=" + timestampType +
+                ", originalTopic=" + originalTopic +
+                ", originalKafkaPartition=" + originalKafkaPartition +
+                ", originalKafkaOffset=" + originalKafkaOffset +
                 "} " + super.toString();
     }
 }
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 9cf349f1eb7..f4e25979f90 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.sink;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.Collection;
 import java.util.Map;
@@ -105,9 +106,13 @@ public abstract class SinkTask implements Task {
     /**
      * Flush all records that have been {@link #put(Collection)} for the 
specified topic-partitions.
      *
-     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}},
-     *                       provided for convenience but could also be 
determined by tracking all offsets included in the {@link SinkRecord}s
-     *                       passed to {@link #put}.
+     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}, provided for
+     *                       convenience but could also be determined by 
tracking all offsets included in the
+     *                       {@link SinkRecord}s passed to {@link #put}. Note 
that the topic, partition and offset
+     *                       here correspond to the original Kafka topic 
partition and offset, before any
+     *                       {@link Transformation transformations} have been 
applied. These can be tracked by the task
+     *                       through the {@link SinkRecord#originalTopic()}, 
{@link SinkRecord#originalKafkaPartition()}
+     *                       and {@link SinkRecord#originalKafkaOffset()} 
methods.
      */
     public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
     }
@@ -118,11 +123,17 @@ public abstract class SinkTask implements Task {
      * The default implementation simply invokes {@link #flush(Map)} and is 
thus able to assume all {@code currentOffsets}
      * are safe to commit.
      *
-     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}},
-     *                       provided for convenience but could also be 
determined by tracking all offsets included in the {@link SinkRecord}s
-     *                       passed to {@link #put}.
+     * @param currentOffsets the current offset state as of the last call to 
{@link #put(Collection)}, provided for
+     *                       convenience but could also be determined by 
tracking all offsets included in the
+     *                       {@link SinkRecord}s passed to {@link #put}. Note 
that the topic, partition and offset
+     *                       here correspond to the original Kafka topic 
partition and offset, before any
+     *                       {@link Transformation transformations} have been 
applied. These can be tracked by the task
+     *                       through the {@link SinkRecord#originalTopic()}, 
{@link SinkRecord#originalKafkaPartition()}
+     *                       and {@link SinkRecord#originalKafkaOffset()} 
methods.
      *
-     * @return an empty map if Connect-managed offset commit is not desired, 
otherwise a map of offsets by topic-partition that are safe to commit.
+     * @return an empty map if Connect-managed offset commit is not desired, 
otherwise a map of offsets by topic-partition that are
+     *         safe to commit. Note that the returned topic-partition to 
offsets map should use the original Kafka
+     *         topic partitions and offsets instead of the transformed values.
      */
     public Map<TopicPartition, OffsetAndMetadata> 
preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
         flush(currentOffsets);
@@ -132,7 +143,11 @@ public abstract class SinkTask implements Task {
     /**
      * The SinkTask uses this method to create writers for newly assigned 
partitions in case of partition
      * rebalance. This method will be called after partition re-assignment 
completes and before the SinkTask starts
-     * fetching data. Note that any errors raised from this method will cause 
the task to stop.
+     * fetching data. Any errors raised from this method will cause the task 
to stop.
+     * <p>
+     * Note that the topic partitions here correspond to the original Kafka 
topic partitions, before any
+     * {@link Transformation transformations} have been applied.
+     *
      * @param partitions The list of partitions that are now assigned to the 
task (may include
      *                   partitions previously assigned to the task)
      */
@@ -151,8 +166,12 @@ public abstract class SinkTask implements Task {
      * The SinkTask uses this method to close writers for partitions that are 
no
      * longer assigned to the SinkTask. This method will be called before a 
rebalance operation starts
      * and after the SinkTask stops fetching data. After being closed, Connect 
will not write
-     * any records to the task until a new set of partitions has been opened. 
Note that any errors raised
+     * any records to the task until a new set of partitions has been opened. 
Any errors raised
      * from this method will cause the task to stop.
+     * <p>
+     * Note that the topic partitions here correspond to the original Kafka 
topic partitions, before any
+     * {@link Transformation transformations} have been applied.
+     *
      * @param partitions The list of partitions that should be closed
      */
     public void close(Collection<TopicPartition> partitions) {
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
index 02a6b2a78f2..329b28ee8b6 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
@@ -46,7 +46,7 @@ public class SinkRecordTest {
     @BeforeEach
     public void beforeEach() {
         record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, 
Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
-                                KAFKA_TIMESTAMP, TS_TYPE, null);
+                                KAFKA_TIMESTAMP, TS_TYPE, null, TOPIC_NAME, 
PARTITION_NUMBER, KAFKA_OFFSET);
     }
 
     @Test
@@ -125,4 +125,36 @@ public class SinkRecordTest {
         Header header = record.headers().lastWithName("intHeader");
         assertEquals(100, (int) Values.convertToInteger(header.schema(), 
header.value()));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void shouldRetainOriginalTopicPartition() {
+        SinkRecord transformed = record.newRecord("transformed-topic", 
PARTITION_NUMBER + 1, Schema.STRING_SCHEMA, "key",
+                Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+        assertEquals(TOPIC_NAME, transformed.originalTopic());
+        assertEquals(PARTITION_NUMBER, transformed.originalKafkaPartition());
+
+        SinkRecord transformed2 = transformed.newRecord("transformed-topic-2", 
PARTITION_NUMBER + 2, Schema.STRING_SCHEMA, "key",
+                Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+        assertEquals(TOPIC_NAME, transformed2.originalTopic());
+        assertEquals(PARTITION_NUMBER, transformed2.originalKafkaPartition());
+    }
+
+    @Test
+    public void shouldRetainOriginalTopicPartitionWithOlderConstructor() {
+        SinkRecord record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, 
Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA,
+                false, KAFKA_OFFSET, KAFKA_TIMESTAMP, TS_TYPE, null);
+        SinkRecord transformed = record.newRecord("transformed-topic", 
PARTITION_NUMBER + 1, Schema.STRING_SCHEMA, "key",
+                Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+        assertEquals(TOPIC_NAME, transformed.originalTopic());
+        assertEquals(PARTITION_NUMBER, transformed.originalKafkaPartition());
+
+        SinkRecord transformed2 = transformed.newRecord("transformed-topic-2", 
PARTITION_NUMBER + 2, Schema.STRING_SCHEMA, "key",
+                Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+        assertEquals(TOPIC_NAME, transformed2.originalTopic());
+        assertEquals(PARTITION_NUMBER, transformed2.originalKafkaPartition());
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
index 4faabcd6a3f..c35c3d3de4d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
@@ -33,8 +33,9 @@ public class InternalSinkRecord extends SinkRecord {
 
     public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, 
SinkRecord record) {
         super(record.topic(), record.kafkaPartition(), record.keySchema(), 
record.key(),
-            record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
-            record.timestampType(), record.headers());
+                record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
+                record.timestampType(), record.headers(), 
originalRecord.topic(), originalRecord.partition(),
+                originalRecord.offset());
         this.originalRecord = originalRecord;
     }
 
@@ -42,7 +43,8 @@ public class InternalSinkRecord extends SinkRecord {
                                  int partition, Schema keySchema, Object key, 
Schema valueSchema,
                                  Object value, long kafkaOffset, Long 
timestamp,
                                  TimestampType timestampType, Iterable<Header> 
headers) {
-        super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers);
+        super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers,
+                originalRecord.topic(), originalRecord.partition(), 
originalRecord.offset());
         this.originalRecord = originalRecord;
     }
 
@@ -51,7 +53,7 @@ public class InternalSinkRecord extends SinkRecord {
                                 Schema valueSchema, Object value, Long 
timestamp,
                                 Iterable<Header> headers) {
         return new InternalSinkRecord(originalRecord, topic, kafkaPartition, 
keySchema, key,
-            valueSchema, value, kafkaOffset(), timestamp, timestampType(), 
headers);
+                valueSchema, value, kafkaOffset(), timestamp, timestampType(), 
headers);
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
index 38798044913..173a05f0caa 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
@@ -29,16 +29,33 @@ import static org.mockito.Mockito.mock;
 
 public class InternalSinkRecordTest {
 
+    private static final String TOPIC = "test-topic";
+
     @Test
     public void testNewRecordHeaders() {
-        SinkRecord sinkRecord = new SinkRecord("test-topic", 0, null, null, 
null, null, 10);
+        SinkRecord sinkRecord = new SinkRecord(TOPIC, 0, null, null, null, 
null, 10);
         ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>("test-topic", 0, 10, null, null);
         InternalSinkRecord internalSinkRecord = new 
InternalSinkRecord(consumerRecord, sinkRecord);
         assertTrue(internalSinkRecord.headers().isEmpty());
         assertTrue(sinkRecord.headers().isEmpty());
 
-        SinkRecord newRecord = internalSinkRecord.newRecord("test-topic", 0, 
null, null, null,
+        SinkRecord newRecord = internalSinkRecord.newRecord(TOPIC, 0, null, 
null, null,
                 null, null, Collections.singletonList(mock(Header.class)));
         assertEquals(1, newRecord.headers().size());
     }
+
+    @Test
+    public void shouldRetainOriginalTopicPartition() {
+        String transformedTopic = "transformed-test-topic";
+        SinkRecord sinkRecord = new SinkRecord(transformedTopic, 0, null, 
null, null, null, 10);
+        ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(TOPIC, 0, 10, null, null);
+        InternalSinkRecord internalSinkRecord = new 
InternalSinkRecord(consumerRecord, sinkRecord);
+
+        assertEquals(TOPIC, internalSinkRecord.originalTopic());
+        assertEquals(0, 
internalSinkRecord.originalKafkaPartition().intValue());
+
+        SinkRecord transformedSinkRecord = 
internalSinkRecord.newRecord(transformedTopic, 1, null, null, null, null, null);
+        assertEquals(TOPIC, transformedSinkRecord.originalTopic());
+        assertEquals(0, 
transformedSinkRecord.originalKafkaPartition().intValue());
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 044b4a4e3a2..5ab27fe78f0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Arrays;
-import java.util.Iterator;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -39,15 +37,15 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -70,10 +68,12 @@ import org.powermock.reflect.Whitebox;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -90,11 +90,11 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1918,6 +1918,39 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testOriginalTopicWithTopicMutatingTransformations() {
+        createTask(initialState);
+
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        expectPollInitialAssignment();
+
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1, "newtopic_");
+        Capture<Collection<SinkRecord>> recordCapture = EasyMock.newCapture();
+        sinkTask.put(EasyMock.capture(recordCapture));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+
+        workerTask.iteration(); // initial assignment
+
+        workerTask.iteration(); // first record delivered
+
+        assertTrue(recordCapture.hasCaptured());
+        assertEquals(1, recordCapture.getValue().size());
+        SinkRecord record = recordCapture.getValue().iterator().next();
+        assertEquals(TOPIC, record.originalTopic());
+        assertEquals("newtopic_" + TOPIC, record.topic());
+
+        PowerMock.verifyAll();
+    }
+
     private void expectInitializeTask() {
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 096ced35d0e..8e5c209aa07 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -189,7 +189,7 @@ public class WorkerSinkTaskThreadedTest {
                 SinkRecord referenceSinkRecord
                         = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, 
VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
                 InternalSinkRecord referenceInternalSinkRecord =
-                    new InternalSinkRecord(null, referenceSinkRecord);
+                    new InternalSinkRecord(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + offset, null, null), referenceSinkRecord);
                 assertEquals(referenceInternalSinkRecord, rec);
                 offset++;
             }

Reply via email to