This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4daeb2714c2 KAFKA-13431 (KIP-793): Expose the original pre-transform
topic partition and offset in sink records (#14024)
4daeb2714c2 is described below
commit 4daeb2714c29e4a34698292dcad5a64beed72e74
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Jul 21 17:06:01 2023 +0100
KAFKA-13431 (KIP-793): Expose the original pre-transform topic partition
and offset in sink records (#14024)
Reviewers: Greg Harris <[email protected]>, Chris Egerton
<[email protected]>
---
checkstyle/import-control.xml | 1 +
.../org/apache/kafka/connect/sink/SinkRecord.java | 144 +++++++++++++++++++--
.../org/apache/kafka/connect/sink/SinkTask.java | 37 ++++--
.../apache/kafka/connect/sink/SinkRecordTest.java | 36 +++++-
.../kafka/connect/runtime/InternalSinkRecord.java | 10 +-
.../connect/runtime/InternalSinkRecordTest.java | 21 ++-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 43 +++++-
.../runtime/WorkerSinkTaskThreadedTest.java | 2 +-
8 files changed, 263 insertions(+), 31 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 61e5070d0c1..9ae0cb12449 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -448,6 +448,7 @@
<subpackage name="sink">
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.connect.connector" />
+ <allow pkg="org.apache.kafka.connect.transforms" />
<allow pkg="org.apache.kafka.connect.storage" />
</subpackage>
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index 3e2f783d3cc..33e5d84389a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -20,11 +20,16 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+import java.util.Objects;
/**
- * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and
includes the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields.
This information
- * should be used by the {@link SinkTask} to coordinate kafkaOffset commits.
+ * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and
includes the original Kafka record's
+ * topic, partition and offset (before any {@link Transformation
transformations} have been applied)
+ * in addition to the standard fields. This information should be used by the
{@link SinkTask} to coordinate
+ * offset commits.
* <p>
* It also includes the {@link TimestampType}, which may be {@link
TimestampType#NO_TIMESTAMP_TYPE}, and the relevant
* timestamp, which may be {@code null}.
@@ -32,6 +37,9 @@ import org.apache.kafka.connect.header.Header;
public class SinkRecord extends ConnectRecord<SinkRecord> {
private final long kafkaOffset;
private final TimestampType timestampType;
+ private final String originalTopic;
+ private final Integer originalKafkaPartition;
+ private final long originalKafkaOffset;
public SinkRecord(String topic, int partition, Schema keySchema, Object
key, Schema valueSchema, Object value, long kafkaOffset) {
this(topic, partition, keySchema, key, valueSchema, value,
kafkaOffset, null, TimestampType.NO_TIMESTAMP_TYPE);
@@ -44,9 +52,22 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
public SinkRecord(String topic, int partition, Schema keySchema, Object
key, Schema valueSchema, Object value, long kafkaOffset,
Long timestamp, TimestampType timestampType,
Iterable<Header> headers) {
+ this(topic, partition, keySchema, key, valueSchema, value,
kafkaOffset, timestamp, timestampType, headers, topic, partition, kafkaOffset);
+ }
+
+ /**
+ * This constructor is intended for use by the Connect runtime only and
plugins (sink connectors or transformations)
+ * should not use this directly outside testing code.
+ */
+ public SinkRecord(String topic, int partition, Schema keySchema, Object
key, Schema valueSchema, Object value, long kafkaOffset,
+ Long timestamp, TimestampType timestampType,
Iterable<Header> headers, String originalTopic,
+ Integer originalKafkaPartition, long
originalKafkaOffset) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp,
headers);
this.kafkaOffset = kafkaOffset;
this.timestampType = timestampType;
+ this.originalTopic = originalTopic;
+ this.originalKafkaPartition = originalKafkaPartition;
+ this.originalKafkaOffset = originalKafkaOffset;
}
public long kafkaOffset() {
@@ -57,6 +78,105 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
return timestampType;
}
+ /**
+ * Get the original topic for this sink record, before any {@link
Transformation transformations} were applied.
+ * In order to be compatible with transformations that mutate topic names,
this method should be used
+ * by sink tasks instead of {@link #topic()} for any internal offset
tracking purposes (for instance, reporting
+ * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodError} when the sink
+ * connector is deployed to Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalTopic;
+ * try {
+ * originalTopic = record.originalTopic();
+ * } catch (NoSuchMethodError e) {
+ * log.warn("This connector is not compatible with SMTs that mutate
topic names, topic partitions or offset values on this version of Kafka
Connect");
+ * originalTopic = record.topic();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate topic
+ * names when deployed to older Connect runtimes that do not support this
method.
+ *
+ * @return the topic for this record before any transformations were
applied
+ *
+ * @since 3.6
+ */
+ public String originalTopic() {
+ return originalTopic;
+ }
+
+ /**
+ * Get the original topic partition for this sink record, before any
{@link Transformation transformations} were applied.
+ * In order to be compatible with transformations that mutate topic
partitions, this method should be used
+ * by sink tasks instead of {@link #kafkaPartition()} for any internal
offset tracking purposes (for instance, reporting
+ * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodError} when the sink
+ * connector is deployed to Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalKafkaPartition;
+ * try {
+ * originalKafkaPartition = record.originalKafkaPartition();
+ * } catch (NoSuchMethodError e) {
+ * log.warn("This connector is not compatible with SMTs that mutate
topic names, topic partitions or offset values on this version of Kafka
Connect");
+ * originalKafkaPartition = record.kafkaPartition();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate topic
+ * partitions when deployed to older Connect runtimes that do not support
this method.
+ *
+ * @return the topic partition for this record before any transformations
were applied
+ *
+ * @since 3.6
+ */
+ public Integer originalKafkaPartition() {
+ return originalKafkaPartition;
+ }
+
+ /**
+ * Get the original offset for this sink record, before any {@link
Transformation transformations} were applied.
+ * In order to be compatible with transformations that mutate offset
values, this method should be used
+ * by sink tasks instead of {@link #kafkaOffset()} for any internal offset
tracking purposes (for instance, reporting
+ * offsets to the Connect runtime via {@link SinkTask#preCommit(Map)}).
+ * <p>
+ * This method was added in Apache Kafka 3.6. Sink connectors that use
this method but want to maintain backward
+ * compatibility in order to be able to be deployed on older Connect
runtimes should guard the call to this method
+ * with a try-catch block, since calling this method will result in a
{@link NoSuchMethodError} when the sink
+ * connector is deployed to Connect runtimes older than Kafka 3.6.
+ * For example:
+ * <pre>{@code
+ * String originalKafkaOffset;
+ * try {
+ * originalKafkaOffset = record.originalKafkaOffset();
+ * } catch (NoSuchMethodError e) {
+ * log.warn("This connector is not compatible with SMTs that mutate
topic names, topic partitions or offset values on this version of Kafka
Connect");
+ * originalKafkaOffset = record.kafkaOffset();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Note that sink connectors that do their own offset tracking will be
incompatible with SMTs that mutate offset
+ * values when deployed to older Connect runtimes that do not support this
method.
+ *
+ * @return the offset for this record before any transformations were
applied
+ *
+ * @since 3.6
+ */
+ public long originalKafkaOffset() {
+ return originalKafkaOffset;
+ }
+
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema
keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
return newRecord(topic, kafkaPartition, keySchema, key, valueSchema,
value, timestamp, headers().duplicate());
@@ -65,7 +185,8 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema
keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
- return new SinkRecord(topic, kafkaPartition, keySchema, key,
valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+ return new SinkRecord(topic, kafkaPartition, keySchema, key,
valueSchema, value, kafkaOffset, timestamp, timestampType, headers,
+ originalTopic, originalKafkaPartition, originalKafkaOffset);
}
@Override
@@ -79,10 +200,11 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
SinkRecord that = (SinkRecord) o;
- if (kafkaOffset != that.kafkaOffset)
- return false;
-
- return timestampType == that.timestampType;
+ return kafkaOffset == that.kafkaOffset &&
+ timestampType == that.timestampType &&
+ Objects.equals(originalTopic, that.originalTopic) &&
+ Objects.equals(originalKafkaPartition,
that.originalKafkaPartition) &&
+ originalKafkaOffset == that.originalKafkaOffset;
}
@Override
@@ -90,6 +212,9 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
int result = super.hashCode();
result = 31 * result + Long.hashCode(kafkaOffset);
result = 31 * result + timestampType.hashCode();
+ result = 31 * result + originalTopic.hashCode();
+ result = 31 * result + originalKafkaPartition.hashCode();
+ result = 31 * result + Long.hashCode(originalKafkaOffset);
return result;
}
@@ -98,6 +223,9 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
return "SinkRecord{" +
"kafkaOffset=" + kafkaOffset +
", timestampType=" + timestampType +
+ ", originalTopic=" + originalTopic +
+ ", originalKafkaPartition=" + originalKafkaPartition +
+ ", originalKafkaOffset=" + originalKafkaOffset +
"} " + super.toString();
}
}
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 9cf349f1eb7..f4e25979f90 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.sink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.transforms.Transformation;
import java.util.Collection;
import java.util.Map;
@@ -105,9 +106,13 @@ public abstract class SinkTask implements Task {
/**
* Flush all records that have been {@link #put(Collection)} for the
specified topic-partitions.
*
- * @param currentOffsets the current offset state as of the last call to
{@link #put(Collection)}},
- * provided for convenience but could also be
determined by tracking all offsets included in the {@link SinkRecord}s
- * passed to {@link #put}.
+ * @param currentOffsets the current offset state as of the last call to
{@link #put(Collection)}, provided for
+ * convenience but could also be determined by
tracking all offsets included in the
+ * {@link SinkRecord}s passed to {@link #put}. Note
that the topic, partition and offset
+ * here correspond to the original Kafka topic
partition and offset, before any
+ * {@link Transformation transformations} have been
applied. These can be tracked by the task
+ * through the {@link SinkRecord#originalTopic()},
{@link SinkRecord#originalKafkaPartition()}
+ * and {@link SinkRecord#originalKafkaOffset()}
methods.
*/
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}
@@ -118,11 +123,17 @@ public abstract class SinkTask implements Task {
* The default implementation simply invokes {@link #flush(Map)} and is
thus able to assume all {@code currentOffsets}
* are safe to commit.
*
- * @param currentOffsets the current offset state as of the last call to
{@link #put(Collection)}},
- * provided for convenience but could also be
determined by tracking all offsets included in the {@link SinkRecord}s
- * passed to {@link #put}.
+ * @param currentOffsets the current offset state as of the last call to
{@link #put(Collection)}, provided for
+ * convenience but could also be determined by
tracking all offsets included in the
+ * {@link SinkRecord}s passed to {@link #put}. Note
that the topic, partition and offset
+ * here correspond to the original Kafka topic
partition and offset, before any
+ * {@link Transformation transformations} have been
applied. These can be tracked by the task
+ * through the {@link SinkRecord#originalTopic()},
{@link SinkRecord#originalKafkaPartition()}
+ * and {@link SinkRecord#originalKafkaOffset()}
methods.
*
- * @return an empty map if Connect-managed offset commit is not desired,
otherwise a map of offsets by topic-partition that are safe to commit.
+ * @return an empty map if Connect-managed offset commit is not desired,
otherwise a map of offsets by topic-partition that are
+ * safe to commit. Note that the returned topic-partition to
offsets map should use the original Kafka
+ * topic partitions and offsets instead of the transformed values.
*/
public Map<TopicPartition, OffsetAndMetadata>
preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
flush(currentOffsets);
@@ -132,7 +143,11 @@ public abstract class SinkTask implements Task {
/**
* The SinkTask uses this method to create writers for newly assigned
partitions in case of partition
* rebalance. This method will be called after partition re-assignment
completes and before the SinkTask starts
- * fetching data. Note that any errors raised from this method will cause
the task to stop.
+ * fetching data. Any errors raised from this method will cause the task
to stop.
+ * <p>
+ * Note that the topic partitions here correspond to the original Kafka
topic partitions, before any
+ * {@link Transformation transformations} have been applied.
+ *
* @param partitions The list of partitions that are now assigned to the
task (may include
* partitions previously assigned to the task)
*/
@@ -151,8 +166,12 @@ public abstract class SinkTask implements Task {
* The SinkTask uses this method to close writers for partitions that are
no
* longer assigned to the SinkTask. This method will be called before a
rebalance operation starts
* and after the SinkTask stops fetching data. After being closed, Connect
will not write
- * any records to the task until a new set of partitions has been opened.
Note that any errors raised
+ * any records to the task until a new set of partitions has been opened.
Any errors raised
* from this method will cause the task to stop.
+ * <p>
+ * Note that the topic partitions here correspond to the original Kafka
topic partitions, before any
+ * {@link Transformation transformations} have been applied.
+ *
* @param partitions The list of partitions that should be closed
*/
public void close(Collection<TopicPartition> partitions) {
diff --git
a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
index 02a6b2a78f2..329b28ee8b6 100644
---
a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
+++
b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
@@ -46,7 +46,7 @@ public class SinkRecordTest {
@BeforeEach
public void beforeEach() {
record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER,
Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
- KAFKA_TIMESTAMP, TS_TYPE, null);
+ KAFKA_TIMESTAMP, TS_TYPE, null, TOPIC_NAME,
PARTITION_NUMBER, KAFKA_OFFSET);
}
@Test
@@ -125,4 +125,36 @@ public class SinkRecordTest {
Header header = record.headers().lastWithName("intHeader");
assertEquals(100, (int) Values.convertToInteger(header.schema(),
header.value()));
}
-}
\ No newline at end of file
+
+ @Test
+ public void shouldRetainOriginalTopicPartition() {
+ SinkRecord transformed = record.newRecord("transformed-topic",
PARTITION_NUMBER + 1, Schema.STRING_SCHEMA, "key",
+ Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+ assertEquals(TOPIC_NAME, transformed.originalTopic());
+ assertEquals(PARTITION_NUMBER, transformed.originalKafkaPartition());
+
+ SinkRecord transformed2 = transformed.newRecord("transformed-topic-2",
PARTITION_NUMBER + 2, Schema.STRING_SCHEMA, "key",
+ Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+ assertEquals(TOPIC_NAME, transformed2.originalTopic());
+ assertEquals(PARTITION_NUMBER, transformed2.originalKafkaPartition());
+ }
+
+ @Test
+ public void shouldRetainOriginalTopicPartitionWithOlderConstructor() {
+ SinkRecord record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER,
Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA,
+ false, KAFKA_OFFSET, KAFKA_TIMESTAMP, TS_TYPE, null);
+ SinkRecord transformed = record.newRecord("transformed-topic",
PARTITION_NUMBER + 1, Schema.STRING_SCHEMA, "key",
+ Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+ assertEquals(TOPIC_NAME, transformed.originalTopic());
+ assertEquals(PARTITION_NUMBER, transformed.originalKafkaPartition());
+
+ SinkRecord transformed2 = transformed.newRecord("transformed-topic-2",
PARTITION_NUMBER + 2, Schema.STRING_SCHEMA, "key",
+ Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP);
+
+ assertEquals(TOPIC_NAME, transformed2.originalTopic());
+ assertEquals(PARTITION_NUMBER, transformed2.originalKafkaPartition());
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
index 4faabcd6a3f..c35c3d3de4d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
@@ -33,8 +33,9 @@ public class InternalSinkRecord extends SinkRecord {
public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord,
SinkRecord record) {
super(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(),
- record.valueSchema(), record.value(), record.kafkaOffset(),
record.timestamp(),
- record.timestampType(), record.headers());
+ record.valueSchema(), record.value(), record.kafkaOffset(),
record.timestamp(),
+ record.timestampType(), record.headers(),
originalRecord.topic(), originalRecord.partition(),
+ originalRecord.offset());
this.originalRecord = originalRecord;
}
@@ -42,7 +43,8 @@ public class InternalSinkRecord extends SinkRecord {
int partition, Schema keySchema, Object key,
Schema valueSchema,
Object value, long kafkaOffset, Long
timestamp,
TimestampType timestampType, Iterable<Header>
headers) {
- super(topic, partition, keySchema, key, valueSchema, value,
kafkaOffset, timestamp, timestampType, headers);
+ super(topic, partition, keySchema, key, valueSchema, value,
kafkaOffset, timestamp, timestampType, headers,
+ originalRecord.topic(), originalRecord.partition(),
originalRecord.offset());
this.originalRecord = originalRecord;
}
@@ -51,7 +53,7 @@ public class InternalSinkRecord extends SinkRecord {
Schema valueSchema, Object value, Long
timestamp,
Iterable<Header> headers) {
return new InternalSinkRecord(originalRecord, topic, kafkaPartition,
keySchema, key,
- valueSchema, value, kafkaOffset(), timestamp, timestampType(),
headers);
+ valueSchema, value, kafkaOffset(), timestamp, timestampType(),
headers);
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
index 38798044913..173a05f0caa 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java
@@ -29,16 +29,33 @@ import static org.mockito.Mockito.mock;
public class InternalSinkRecordTest {
+ private static final String TOPIC = "test-topic";
+
@Test
public void testNewRecordHeaders() {
- SinkRecord sinkRecord = new SinkRecord("test-topic", 0, null, null,
null, null, 10);
+ SinkRecord sinkRecord = new SinkRecord(TOPIC, 0, null, null, null,
null, 10);
ConsumerRecord<byte[], byte[]> consumerRecord = new
ConsumerRecord<>("test-topic", 0, 10, null, null);
InternalSinkRecord internalSinkRecord = new
InternalSinkRecord(consumerRecord, sinkRecord);
assertTrue(internalSinkRecord.headers().isEmpty());
assertTrue(sinkRecord.headers().isEmpty());
- SinkRecord newRecord = internalSinkRecord.newRecord("test-topic", 0,
null, null, null,
+ SinkRecord newRecord = internalSinkRecord.newRecord(TOPIC, 0, null,
null, null,
null, null, Collections.singletonList(mock(Header.class)));
assertEquals(1, newRecord.headers().size());
}
+
+ @Test
+ public void shouldRetainOriginalTopicPartition() {
+ String transformedTopic = "transformed-test-topic";
+ SinkRecord sinkRecord = new SinkRecord(transformedTopic, 0, null,
null, null, null, 10);
+ ConsumerRecord<byte[], byte[]> consumerRecord = new
ConsumerRecord<>(TOPIC, 0, 10, null, null);
+ InternalSinkRecord internalSinkRecord = new
InternalSinkRecord(consumerRecord, sinkRecord);
+
+ assertEquals(TOPIC, internalSinkRecord.originalTopic());
+ assertEquals(0,
internalSinkRecord.originalKafkaPartition().intValue());
+
+ SinkRecord transformedSinkRecord =
internalSinkRecord.newRecord(transformedTopic, 1, null, null, null, null, null);
+ assertEquals(TOPIC, transformedSinkRecord.originalTopic());
+ assertEquals(0,
transformedSinkRecord.originalKafkaPartition().intValue());
+ }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 044b4a4e3a2..5ab27fe78f0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.connect.runtime;
-import java.util.Arrays;
-import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -39,15 +37,15 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -70,10 +68,12 @@ import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -90,11 +90,11 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1918,6 +1918,39 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testOriginalTopicWithTopicMutatingTransformations() {
+ createTask(initialState);
+
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ expectPollInitialAssignment();
+
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1, "newtopic_");
+ Capture<Collection<SinkRecord>> recordCapture = EasyMock.newCapture();
+ sinkTask.put(EasyMock.capture(recordCapture));
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+
+ workerTask.iteration(); // initial assignment
+
+ workerTask.iteration(); // first record delivered
+
+ assertTrue(recordCapture.hasCaptured());
+ assertEquals(1, recordCapture.getValue().size());
+ SinkRecord record = recordCapture.getValue().iterator().next();
+ assertEquals(TOPIC, record.originalTopic());
+ assertEquals("newtopic_" + TOPIC, record.topic());
+
+ PowerMock.verifyAll();
+ }
+
private void expectInitializeTask() {
consumer.subscribe(EasyMock.eq(asList(TOPIC)),
EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 096ced35d0e..8e5c209aa07 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -189,7 +189,7 @@ public class WorkerSinkTaskThreadedTest {
SinkRecord referenceSinkRecord
= new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY,
VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
InternalSinkRecord referenceInternalSinkRecord =
- new InternalSinkRecord(null, referenceSinkRecord);
+ new InternalSinkRecord(new ConsumerRecord<>(TOPIC,
PARTITION, FIRST_OFFSET + offset, null, null), referenceSinkRecord);
assertEquals(referenceInternalSinkRecord, rec);
offset++;
}