This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de409b389d2 KAFKA-15177: Implement KIP-875
SourceConnector::alterOffset API in MirrorMaker 2 connectors (#14005)
de409b389d2 is described below
commit de409b389d26f7681fba8583db2b96584258aa48
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Aug 17 09:33:59 2023 -0400
KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in
MirrorMaker 2 connectors (#14005)
Reviewers: Yash Mayya <[email protected]>, Greg Harris
<[email protected]>
---
.../connect/mirror/MirrorCheckpointConnector.java | 31 ++++
.../connect/mirror/MirrorHeartbeatConnector.java | 30 ++++
.../connect/mirror/MirrorSourceConnector.java | 29 ++++
.../apache/kafka/connect/mirror/MirrorUtils.java | 173 +++++++++++++++++++--
.../mirror/MirrorCheckpointConnectorTest.java | 145 +++++++++++++++++
.../mirror/MirrorHeartBeatConnectorTest.java | 138 ++++++++++++++++
.../connect/mirror/MirrorSourceConnectorTest.java | 141 +++++++++++++++++
.../MirrorConnectorsIntegrationBaseTest.java | 102 +++++++++++-
...MirrorConnectorsIntegrationExactlyOnceTest.java | 52 +++++++
.../integration/ConnectWorkerIntegrationTest.java | 11 ++
.../clusters/EmbeddedConnectClusterAssertions.java | 5 -
.../util/clusters/EmbeddedKafkaCluster.java | 9 +-
12 files changed, 844 insertions(+), 22 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 07e7b49a44e..1a146dcf5ac 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.slf4j.Logger;
@@ -40,6 +41,9 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
+
/** Replicate consumer group state between clusters. Emits checkpoint records.
*
* @see MirrorCheckpointConfig for supported config properties.
@@ -132,6 +136,33 @@ public class MirrorCheckpointConnector extends
SourceConnector {
return AppInfoParser.getVersion();
}
+ @Override
+ public boolean alterOffsets(Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets) {
+ for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry :
offsets.entrySet()) {
+ Map<String, ?> sourceOffset = offsetEntry.getValue();
+ if (sourceOffset == null) {
+ // We allow tombstones for anything; if there's garbage in the
offsets for the connector, we don't
+ // want to prevent users from being able to clean it up using
the REST API
+ continue;
+ }
+
+ Map<String, ?> sourcePartition = offsetEntry.getKey();
+ if (sourcePartition == null) {
+ throw new ConnectException("Source partitions may not be
null");
+ }
+
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
CONSUMER_GROUP_ID_KEY);
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
TOPIC_KEY);
+ MirrorUtils.validateSourcePartitionPartition(sourcePartition);
+
+ MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset,
true);
+ }
+
+ // We don't actually use these offsets in the task class, so no
additional effort is required beyond just validating
+ // the format of the user-supplied offsets
+ return true;
+ }
+
private void refreshConsumerGroups()
throws InterruptedException, ExecutionException {
Set<String> consumerGroups = findConsumerGroups();
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
index 6410e8fc3f9..6ab9fce31be 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
@@ -27,6 +28,9 @@ import java.util.Map;
import java.util.List;
import java.util.Collections;
+import static
org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY;
+import static
org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY;
+
/** Emits heartbeats to Kafka.
*
* @see MirrorHeartbeatConfig for supported config properties.
@@ -85,6 +89,32 @@ public class MirrorHeartbeatConnector extends
SourceConnector {
return AppInfoParser.getVersion();
}
+ @Override
+ public boolean alterOffsets(Map<String, String> config, Map<Map<String,
?>, Map<String, ?>> offsets) {
+ for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry :
offsets.entrySet()) {
+ Map<String, ?> sourceOffset = offsetEntry.getValue();
+ if (sourceOffset == null) {
+ // We allow tombstones for anything; if there's garbage in the
offsets for the connector, we don't
+ // want to prevent users from being able to clean it up using
the REST API
+ continue;
+ }
+
+ Map<String, ?> sourcePartition = offsetEntry.getKey();
+ if (sourcePartition == null) {
+ throw new ConnectException("Source partitions may not be
null");
+ }
+
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
SOURCE_CLUSTER_ALIAS_KEY);
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
TARGET_CLUSTER_ALIAS_KEY);
+
+ MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset,
true);
+ }
+
+ // We don't actually use these offsets in the task class, so no
additional effort is required beyond just validating
+ // the format of the user-supplied offsets
+ return true;
+ }
+
private void createInternalTopics() {
MirrorUtils.createSinglePartitionCompactedTopic(
config.heartbeatsTopic(),
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 5c45d9ed045..74485bd7b52 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -75,6 +75,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static
org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
+import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
/** Replicate data, configuration, and ACLs between clusters.
*
@@ -268,6 +270,33 @@ public class MirrorSourceConnector extends SourceConnector
{
: ExactlyOnceSupport.UNSUPPORTED;
}
+ @Override
+ public boolean alterOffsets(Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets) {
+ for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry :
offsets.entrySet()) {
+ Map<String, ?> sourceOffset = offsetEntry.getValue();
+ if (sourceOffset == null) {
+ // We allow tombstones for anything; if there's garbage in the
offsets for the connector, we don't
+ // want to prevent users from being able to clean it up using
the REST API
+ continue;
+ }
+
+ Map<String, ?> sourcePartition = offsetEntry.getKey();
+ if (sourcePartition == null) {
+ throw new ConnectException("Source partitions may not be
null");
+ }
+
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
SOURCE_CLUSTER_KEY);
+ MirrorUtils.validateSourcePartitionString(sourcePartition,
TOPIC_KEY);
+ MirrorUtils.validateSourcePartitionPartition(sourcePartition);
+
+ MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset,
false);
+ }
+
+ // We never commit offsets with our source consumer, so no additional
effort is required beyond just validating
+ // the format of the user-supplied offsets
+ return true;
+ }
+
private boolean consumerUsesReadCommitted(Map<String, String> props) {
Object consumerIsolationLevel =
MirrorSourceConfig.sourceConsumerConfig(props)
.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index eb6bdeebb9e..9993a4331cc 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,13 +40,19 @@ import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.Collections;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
/** Internal utility methods. */
-final class MirrorUtils {
+public final class MirrorUtils {
+
+ public static final String SOURCE_CLUSTER_KEY = "cluster";
+ public static final String TOPIC_KEY = "topic";
+ public static final String PARTITION_KEY = "partition";
+ public static final String OFFSET_KEY = "offset";
private static final Logger log =
LoggerFactory.getLogger(MirrorUtils.class);
// utility class
@@ -65,27 +72,171 @@ final class MirrorUtils {
static Map<String, Object> wrapPartition(TopicPartition topicPartition,
String sourceClusterAlias) {
Map<String, Object> wrapped = new HashMap<>();
- wrapped.put("topic", topicPartition.topic());
- wrapped.put("partition", topicPartition.partition());
- wrapped.put("cluster", sourceClusterAlias);
+ wrapped.put(TOPIC_KEY, topicPartition.topic());
+ wrapped.put(PARTITION_KEY, topicPartition.partition());
+ wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
return wrapped;
}
- static Map<String, Object> wrapOffset(long offset) {
- return Collections.singletonMap("offset", offset);
+ public static Map<String, Object> wrapOffset(long offset) {
+ return Collections.singletonMap(OFFSET_KEY, offset);
}
- static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
- String topic = (String) wrapped.get("topic");
- int partition = (Integer) wrapped.get("partition");
+ public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+ String topic = (String) wrapped.get(TOPIC_KEY);
+ int partition = (Integer) wrapped.get(PARTITION_KEY);
return new TopicPartition(topic, partition);
}
static Long unwrapOffset(Map<String, ?> wrapped) {
- if (wrapped == null || wrapped.get("offset") == null) {
+ if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
return -1L;
}
- return (Long) wrapped.get("offset");
+ return (Long) wrapped.get(OFFSET_KEY);
+ }
+
+
+ /**
+ * Validate a specific key in a source partition that may be written to
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map,
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be
null
+ * @param key the key to check for in the source partition; may not be null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+ static void validateSourcePartitionString(Map<String, ?> sourcePartition,
String key) {
+ Objects.requireNonNull(sourcePartition, "Source partition may not be
null");
+ Objects.requireNonNull(key, "Key may not be null");
+
+ if (!sourcePartition.containsKey(key))
+ throw new ConnectException(String.format(
+ "Source partition %s is missing the '%s' key, which is
required",
+ sourcePartition,
+ key
+ ));
+
+ Object value = sourcePartition.get(key);
+ if (!(value instanceof String)) {
+ throw new ConnectException(String.format(
+ "Source partition %s has an invalid value %s for the '%s'
key, which must be a string",
+ sourcePartition,
+ value,
+ key
+ ));
+ }
+ }
+
+ /**
+ * Validate the {@link #PARTITION_KEY partition key} in a source partition
that may be written to the offsets topic
+ * for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map
and that its value is a non-negative integer.
+ * <p/>
+ * Note that the partition key most likely refers to a partition in a
Kafka topic, whereas the term "source partition" refers
+ * to a {@link SourceRecord#sourcePartition() source partition} that is
stored in a Kafka Connect worker's internal offsets
+ * topic (or, if running in standalone mode, offsets file).
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map,
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be
null
+ *
+ * @throws ConnectException if the partition is invalid
+ */
+ static void validateSourcePartitionPartition(Map<String, ?>
sourcePartition) {
+ Objects.requireNonNull(sourcePartition, "Source partition may not be
null");
+
+ if (!sourcePartition.containsKey(PARTITION_KEY))
+ throw new ConnectException(String.format(
+ "Source partition %s is missing the '%s' key, which is
required",
+ sourcePartition,
+ PARTITION_KEY
+ ));
+
+ Object value = sourcePartition.get(PARTITION_KEY);
+ // The value may be encoded as a long but as long as it fits inside a
32-bit integer, that's fine
+ if (!(value instanceof Integer || value instanceof Long) || ((Number)
value).longValue() > Integer.MAX_VALUE) {
+ throw new ConnectException(String.format(
+ "Source partition %s has an invalid value %s for the '%s'
key, which must be an integer",
+ sourcePartition,
+ value,
+ PARTITION_KEY
+ ));
+ }
+
+ if (((Number) value).intValue() < 0) {
+ throw new ConnectException(String.format(
+ "Source partition %s has an invalid value %s for the '%s'
key, which cannot be negative",
+ sourcePartition,
+ value,
+ PARTITION_KEY
+ ));
+ }
+ }
+
+ /**
+ * Validate a source offset that may be written to the offsets topic for
one of the MM2 connectors.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map,
Map)
+ * @see SourceRecord#sourceOffset()
+ *
+ * @param sourcePartition the corresponding {@link
SourceRecord#sourcePartition() source partition} for the offset;
+ * may not be null
+ * @param sourceOffset the to-be-validated source offset; may be null
(which is considered valid)
+ * @param onlyOffsetZero whether the "offset" value in the source offset
map must be zero;
+ * if {@code true}, then only zero is permitted; if
{@code false}, then any non-negative
+ * value is permitted
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+ static void validateSourceOffset(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset, boolean onlyOffsetZero) {
+ Objects.requireNonNull(sourcePartition, "Source partition may not be
null");
+
+ if (sourceOffset == null) {
+ return;
+ }
+
+ if (!sourceOffset.containsKey(OFFSET_KEY)) {
+ throw new ConnectException(String.format(
+ "Source offset %s for source partition %s is missing the
'%s' key, which is required",
+ sourceOffset,
+ sourcePartition,
+ OFFSET_KEY
+ ));
+ }
+
+ Object offset = sourceOffset.get(OFFSET_KEY);
+ if (!(offset instanceof Integer || offset instanceof Long)) {
+ throw new ConnectException(String.format(
+ "Source offset %s for source partition %s has an invalid
value %s for the '%s' key, which must be an integer",
+ sourceOffset,
+ sourcePartition,
+ offset,
+ OFFSET_KEY
+ ));
+ }
+
+ long offsetValue = ((Number) offset).longValue();
+ if (onlyOffsetZero && offsetValue != 0) {
+ throw new ConnectException(String.format(
+ "Source offset %s for source partition %s has an invalid
value %s for the '%s' key; the only accepted value is 0",
+ sourceOffset,
+ sourcePartition,
+ offset,
+ OFFSET_KEY
+ ));
+ } else if (!onlyOffsetZero && offsetValue < 0) {
+ throw new ConnectException(String.format(
+ "Source offset %s for source partition %s has an invalid
value %s for the '%s' key, which cannot be negative",
+ sourceOffset,
+ sourcePartition,
+ offset,
+ OFFSET_KEY
+ ));
+ }
}
static TopicPartition decodeTopicPartition(String topicPartitionString) {
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 54818029150..ecc9fcbc11f 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -29,10 +30,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.PARTITION_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -41,6 +48,7 @@ import static org.mockito.Mockito.spy;
public class MirrorCheckpointConnectorTest {
private static final String CONSUMER_GROUP = "consumer-group-1";
+ private static final Map<String, ?> SOURCE_OFFSET =
MirrorUtils.wrapOffset(0);
@Test
public void testMirrorCheckpointConnectorDisabled() {
@@ -183,4 +191,141 @@ public class MirrorCheckpointConnectorTest {
assertEquals(groupFound, verifiedSet);
}
+ @Test
+ public void testAlterOffsetsIncorrectPartitionKey() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ Collections.singletonMap("unused_partition_key",
"unused_partition_value"),
+ SOURCE_OFFSET
+ )));
+
+ // null partitions are invalid
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ null,
+ SOURCE_OFFSET
+ )));
+ }
+
+ @Test
+ public void testAlterOffsetsMissingPartitionKey() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(null, Collections.singletonMap(
+ partition,
+ SOURCE_OFFSET
+ ));
+
+ Map<String, ?> validPartition = sourcePartition("consumer-app-1", "t",
3);
+ // Sanity check to make sure our valid partition is actually valid
+ assertTrue(alterOffsets.apply(validPartition));
+
+ for (String key : Arrays.asList(CONSUMER_GROUP_ID_KEY, TOPIC_KEY,
PARTITION_KEY)) {
+ Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+ invalidPartition.remove(key);
+ assertThrows(ConnectException.class, () ->
alterOffsets.apply(invalidPartition));
+ }
+ }
+
+ @Test
+ public void testAlterOffsetsInvalidPartitionPartition() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+ Map<String, Object> partition = sourcePartition("consumer-app-2", "t",
3);
+ partition.put(PARTITION_KEY, "a string");
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ partition,
+ SOURCE_OFFSET
+ )));
+ }
+
+ @Test
+ public void testAlterOffsetsMultiplePartitions() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Map<String, ?> partition1 = sourcePartition("consumer-app-3", "t1", 0);
+ Map<String, ?> partition2 = sourcePartition("consumer-app-4", "t1", 1);
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+ offsets.put(partition1, SOURCE_OFFSET);
+ offsets.put(partition2, SOURCE_OFFSET);
+
+ assertTrue(connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsIncorrectOffsetKey() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("consumer-app-5", "t1", 2),
+ Collections.singletonMap("unused_offset_key", 0)
+ );
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsOffsetValues() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Function<Object, Boolean> alterOffsets = offset ->
connector.alterOffsets(null, Collections.singletonMap(
+ sourcePartition("consumer-app-6", "t", 5),
+ Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+ ));
+
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(new
Object()));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(((long)
Integer.MAX_VALUE) + 1));
+ assertTrue(() -> alterOffsets.apply(0));
+ }
+
+ @Test
+ public void testSuccessfulAlterOffsets() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("consumer-app-7", "t2", 0),
+ SOURCE_OFFSET
+ );
+
+ // Expect no exception to be thrown when a valid offsets map is
passed. An empty offsets map is treated as valid
+ // since it could indicate that the offsets were reset previously or
that no offsets have been committed yet
+ // (for a reset operation)
+ assertTrue(connector.alterOffsets(null, offsets));
+ assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+ }
+
+ @Test
+ public void testAlterOffsetsTombstones() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(
+ null,
+ Collections.singletonMap(partition, null)
+ );
+
+ Map<String, Object> partition = sourcePartition("consumer-app-2", "t",
3);
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.put(PARTITION_KEY, "a string");
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.remove(PARTITION_KEY);
+ assertTrue(() -> alterOffsets.apply(partition));
+
+ assertTrue(() -> alterOffsets.apply(null));
+ assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+ assertTrue(() ->
alterOffsets.apply(Collections.singletonMap("unused_partition_key",
"unused_partition_value")));
+ }
+
+ private static Map<String, Object> sourcePartition(String consumerGroupId,
String topic, int partition) {
+ Map<String, Object> result = new HashMap<>();
+ result.put(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+ result.put(TOPIC_KEY, topic);
+ result.put(PARTITION_KEY, partition);
+ return result;
+ }
+
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
index b651fac73ed..0248e487c18 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
@@ -16,14 +16,27 @@
*/
package org.apache.kafka.connect.mirror;
+import static
org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY;
+import static
org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;
public class MirrorHeartBeatConnectorTest {
+ private static final Map<String, ?> SOURCE_OFFSET =
MirrorUtils.wrapOffset(0);
+
@Test
public void testMirrorHeartbeatConnectorDisabled() {
// disable the heartbeat emission
@@ -49,4 +62,129 @@ public class MirrorHeartBeatConnectorTest {
// expect one task will be created, even the replication is disabled
assertEquals(1, output.size(), "Task should have been created even
with replication disabled");
}
+
+ @Test
+ public void testAlterOffsetsIncorrectPartitionKey() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ Collections.singletonMap("unused_partition_key",
"unused_partition_value"),
+ SOURCE_OFFSET
+ )));
+
+ // null partitions are invalid
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ null,
+ SOURCE_OFFSET
+ )));
+ }
+
+ @Test
+ public void testAlterOffsetsMissingPartitionKey() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(null, Collections.singletonMap(
+ partition,
+ SOURCE_OFFSET
+ ));
+
+ Map<String, ?> validPartition = sourcePartition("primary", "backup");
+ // Sanity check to make sure our valid partition is actually valid
+ assertTrue(alterOffsets.apply(validPartition));
+
+ for (String key : Arrays.asList(SOURCE_CLUSTER_ALIAS_KEY,
TARGET_CLUSTER_ALIAS_KEY)) {
+ Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+ invalidPartition.remove(key);
+ assertThrows(ConnectException.class, () ->
alterOffsets.apply(invalidPartition));
+ }
+ }
+
+ @Test
+ public void testAlterOffsetsMultiplePartitions() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Map<String, ?> partition1 = sourcePartition("primary", "backup");
+ Map<String, ?> partition2 = sourcePartition("backup", "primary");
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+ offsets.put(partition1, SOURCE_OFFSET);
+ offsets.put(partition2, SOURCE_OFFSET);
+
+ assertTrue(connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsIncorrectOffsetKey() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("primary", "backup"),
+ Collections.singletonMap("unused_offset_key", 0)
+ );
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsOffsetValues() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Function<Object, Boolean> alterOffsets = offset ->
connector.alterOffsets(null, Collections.singletonMap(
+ sourcePartition("primary", "backup"),
+ Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+ ));
+
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(new
Object()));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(((long)
Integer.MAX_VALUE) + 1));
+ assertTrue(() -> alterOffsets.apply(0));
+ }
+
+ @Test
+ public void testSuccessfulAlterOffsets() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("primary", "backup"),
+ SOURCE_OFFSET
+ );
+
+ // Expect no exception to be thrown when a valid offsets map is
passed. An empty offsets map is treated as valid
+ // since it could indicate that the offsets were reset previously or
that no offsets have been committed yet
+ // (for a reset operation)
+ assertTrue(connector.alterOffsets(null, offsets));
+ assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+ }
+
+ @Test
+ public void testAlterOffsetsTombstones() {
+ MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(
+ null,
+ Collections.singletonMap(partition, null)
+ );
+
+ Map<String, Object> partition = sourcePartition("src", "bak");
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.put(SOURCE_CLUSTER_ALIAS_KEY, 618);
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.remove(SOURCE_CLUSTER_ALIAS_KEY);
+ assertTrue(() -> alterOffsets.apply(partition));
+
+ assertTrue(() -> alterOffsets.apply(null));
+ assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+ assertTrue(() ->
alterOffsets.apply(Collections.singletonMap("unused_partition_key",
"unused_partition_value")));
+ }
+
+ private static Map<String, Object> sourcePartition(String
sourceClusterAlias, String targetClusterAlias) {
+ Map<String, Object> result = new HashMap<>();
+ result.put(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+ result.put(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+ return result;
+ }
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 5e626679e9d..bc56d3ee8ea 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -48,8 +48,12 @@ import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLI
import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
import static
org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_LAG_MAX;
import static
org.apache.kafka.connect.mirror.MirrorSourceConfig.TASK_TOPIC_PARTITIONS;
+import static org.apache.kafka.connect.mirror.MirrorUtils.PARTITION_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -78,6 +82,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class MirrorSourceConnectorTest {
@@ -683,4 +688,140 @@ public class MirrorSourceConnectorTest {
assertNotNull(result, "Connector should not have record null config
value for '" + name + "' property");
return Optional.of(result);
}
+
+ @Test
+ public void testAlterOffsetsIncorrectPartitionKey() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ Collections.singletonMap("unused_partition_key",
"unused_partition_value"),
+ MirrorUtils.wrapOffset(10)
+ )));
+
+ // null partitions are invalid
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ null,
+ MirrorUtils.wrapOffset(10)
+ )));
+ }
+
+ @Test
+ public void testAlterOffsetsMissingPartitionKey() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(null, Collections.singletonMap(
+ partition,
+ MirrorUtils.wrapOffset(64)
+ ));
+
+ Map<String, ?> validPartition = sourcePartition("t", 3, "us-east-2");
+ // Sanity check to make sure our valid partition is actually valid
+ assertTrue(alterOffsets.apply(validPartition));
+
+ for (String key : Arrays.asList(SOURCE_CLUSTER_KEY, TOPIC_KEY,
PARTITION_KEY)) {
+ Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+ invalidPartition.remove(key);
+ assertThrows(ConnectException.class, () ->
alterOffsets.apply(invalidPartition));
+ }
+ }
+
+ @Test
+ public void testAlterOffsetsInvalidPartitionPartition() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+ Map<String, Object> partition = sourcePartition("t", 3, "us-west-2");
+ partition.put(PARTITION_KEY, "a string");
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, Collections.singletonMap(
+ partition,
+ MirrorUtils.wrapOffset(49)
+ )));
+ }
+
+ @Test
+ public void testAlterOffsetsMultiplePartitions() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+
+ Map<String, ?> partition1 = sourcePartition("t1", 0, "primary");
+ Map<String, ?> partition2 = sourcePartition("t1", 1, "primary");
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+ offsets.put(partition1, MirrorUtils.wrapOffset(50));
+ offsets.put(partition2, MirrorUtils.wrapOffset(100));
+
+ assertTrue(connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsIncorrectOffsetKey() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("t1", 2, "backup"),
+ Collections.singletonMap("unused_offset_key", 0)
+ );
+ assertThrows(ConnectException.class, () ->
connector.alterOffsets(null, offsets));
+ }
+
+ @Test
+ public void testAlterOffsetsOffsetValues() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+
+ Function<Object, Boolean> alterOffsets = offset ->
connector.alterOffsets(null, Collections.singletonMap(
+ sourcePartition("t", 5, "backup"),
+ Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+ ));
+
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(new
Object()));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+ assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+ assertTrue(() -> alterOffsets.apply(0));
+ assertTrue(() -> alterOffsets.apply(10));
+ assertTrue(() -> alterOffsets.apply(((long) Integer.MAX_VALUE) + 1));
+ }
+
+ @Test
+ public void testSuccessfulAlterOffsets() {
+ MirrorSourceConnector connector = new MirrorSourceConnector();
+
+ Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+ sourcePartition("t2", 0, "backup"),
+ MirrorUtils.wrapOffset(5)
+ );
+
+ // Expect no exception to be thrown when a valid offsets map is
passed. An empty offsets map is treated as valid
+ // since it could indicate that the offsets were reset previously or
that no offsets have been committed yet
+ // (for a reset operation)
+ assertTrue(connector.alterOffsets(null, offsets));
+ assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+ }
+
+ @Test
+ public void testAlterOffsetsTombstones() {
+ MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+ Function<Map<String, ?>, Boolean> alterOffsets = partition ->
connector.alterOffsets(
+ null,
+ Collections.singletonMap(partition, null)
+ );
+
+ Map<String, Object> partition = sourcePartition("kips", 875,
"apache.kafka");
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.put(PARTITION_KEY, "a string");
+ assertTrue(() -> alterOffsets.apply(partition));
+ partition.remove(PARTITION_KEY);
+ assertTrue(() -> alterOffsets.apply(partition));
+
+ assertTrue(() -> alterOffsets.apply(null));
+ assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+ assertTrue(() ->
alterOffsets.apply(Collections.singletonMap("unused_partition_key",
"unused_partition_value")));
+ }
+
+ private static Map<String, Object> sourcePartition(String topic, int
partition, String sourceClusterAlias) {
+ return MirrorUtils.wrapPartition(
+ new TopicPartition(topic, partition),
+ sourceClusterAlias
+ );
+ }
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index bc5e0212bbf..4beeaaf876a 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -44,10 +44,13 @@ import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.TestUtils;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -56,10 +59,12 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -69,6 +74,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Tag;
@@ -375,7 +381,6 @@ public class MirrorConnectorsIntegrationBaseTest {
assertTrue(primaryConsumer.position(
new TopicPartition(reverseTopic1, 0)) <=
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
}
-
}
// create more matching topics
@@ -406,7 +411,7 @@ public class MirrorConnectorsIntegrationBaseTest {
"New topic was not replicated to primary cluster.");
}
}
-
+
@Test
public void testReplicationWithEmptyPartition() throws Exception {
String consumerGroupName =
"consumer-group-testReplicationWithEmptyPartition";
@@ -933,12 +938,103 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
- private static void restartMirrorMakerConnectors(EmbeddedConnectCluster
connectCluster, List<Class<? extends Connector>> connectorClasses) {
+ protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster
connectCluster, List<Class<? extends Connector>> connectorClasses) {
for (Class<? extends Connector> connector : connectorClasses) {
connectCluster.restartConnectorAndTasks(connector.getSimpleName(),
false, true, false);
}
}
+ @SafeVarargs
+ protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster
connectCluster, Class<? extends Connector>... connectorClasses) throws
InterruptedException {
+ for (Class<? extends Connector> connectorClass : connectorClasses) {
+ connectCluster.resumeConnector(connectorClass.getSimpleName());
+ }
+ for (Class<? extends Connector> connectorClass : connectorClasses) {
+ String connectorName = connectorClass.getSimpleName();
+
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ connectorName,
+ 1,
+ "Connector '" + connectorName + "' and/or task did not
resume in time"
+ );
+ }
+ }
+
+ @SafeVarargs
+ protected static void stopMirrorMakerConnectors(EmbeddedConnectCluster
connectCluster, Class<? extends Connector>... connectorClasses) throws
InterruptedException {
+ for (Class<? extends Connector> connectorClass : connectorClasses) {
+ connectCluster.stopConnector(connectorClass.getSimpleName());
+ }
+ for (Class<? extends Connector> connectorClass : connectorClasses) {
+ String connectorName = connectorClass.getSimpleName();
+ connectCluster.assertions().assertConnectorIsStopped(
+ connectorName,
+ "Connector did not stop in time"
+ );
+ }
+ }
+
+ protected static void
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster,
LongUnaryOperator alterOffset, String... topics) {
+ Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+ String connectorName = MirrorSourceConnector.class.getSimpleName();
+
+ ConnectorOffsets currentOffsets =
connectCluster.connectorOffsets(connectorName);
+ List<ConnectorOffset> alteredOffsetContents =
currentOffsets.offsets().stream()
+ .map(connectorOffset -> {
+ TopicPartition topicPartition =
MirrorUtils.unwrapPartition(connectorOffset.partition());
+ if (!topicsSet.contains(topicPartition.topic())) {
+ return null;
+ }
+
+ Object currentOffsetObject =
connectorOffset.offset().get(MirrorUtils.OFFSET_KEY);
+ if (!(currentOffsetObject instanceof Integer ||
currentOffsetObject instanceof Long)) {
+ throw new AssertionError("Unexpected type for offset
'" + currentOffsetObject + "'; should be integer or long");
+ }
+
+ long currentOffset = ((Number)
currentOffsetObject).longValue();
+ long alteredOffset =
alterOffset.applyAsLong(currentOffset);
+
+ return new ConnectorOffset(
+ connectorOffset.partition(),
+ MirrorUtils.wrapOffset(alteredOffset)
+ );
+ }).filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ connectCluster.alterConnectorOffsets(connectorName, new
ConnectorOffsets(alteredOffsetContents));
+ }
+
+ protected static void
resetSomeMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster
connectCluster, String... topics) {
+ Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+ String connectorName = MirrorSourceConnector.class.getSimpleName();
+
+ ConnectorOffsets currentOffsets =
connectCluster.connectorOffsets(connectorName);
+ List<ConnectorOffset> alteredOffsetContents =
currentOffsets.offsets().stream()
+ .map(connectorOffset -> {
+ TopicPartition topicPartition =
MirrorUtils.unwrapPartition(connectorOffset.partition());
+ if (!topicsSet.contains(topicPartition.topic())) {
+ return null;
+ }
+
+ return new ConnectorOffset(connectorOffset.partition(),
null);
+ }).filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ connectCluster.alterConnectorOffsets(connectorName, new
ConnectorOffsets(alteredOffsetContents));
+ }
+
+ @SafeVarargs
+ protected static void
resetAllMirrorMakerConnectorOffsets(EmbeddedConnectCluster connectCluster,
Class<? extends Connector>... connectorClasses) {
+ for (Class<? extends Connector> connectorClass : connectorClasses) {
+ String connectorName = connectorClass.getSimpleName();
+ connectCluster.resetConnectorOffsets(connectorName);
+ assertEquals(
+ Collections.emptyList(),
+ connectCluster.connectorOffsets(connectorName).offsets(),
+ "Offsets for connector should be completely empty after
full reset"
+ );
+ }
+ }
+
/*
* wait for the topic created on the cluster
*/
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
index 0081dcb3688..36e7b34dd54 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -16,13 +16,18 @@
*/
package org.apache.kafka.connect.mirror.integration;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Properties;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
/**
* Tests MM2 replication with exactly-once support enabled on the Connect
clusters.
*/
@@ -46,4 +51,51 @@ public class MirrorConnectorsIntegrationExactlyOnceTest
extends MirrorConnectors
super.startClusters();
}
+ @Override
+ @Test
+ public void testReplication() throws Exception {
+ super.testReplication();
+
+ // Augment the base replication test case with some extra testing of
the offset management
+ // API introduced in KIP-875
+ // We do this only when exactly-once support is enabled in order to
avoid having to worry about
+ // zombie tasks producing duplicate records and/or writing stale
offsets to the offsets topic
+
+ String backupTopic1 = remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS);
+ String backupTopic2 = remoteTopicName("test-topic-2",
PRIMARY_CLUSTER_ALIAS);
+
+ stopMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+ // Explicitly move back to offset 0
+ // Note that the connector treats the offset as the last-consumed
offset,
+ // so it will start reading the topic partition from offset 1 when it
resumes
+ alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L,
"test-topic-1");
+ // Reset the offsets for test-topic-2
+ resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+ resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+ int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED +
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+ assertEquals(expectedRecordsTopic1,
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS,
backupTopic1).count(),
+ "Records were not re-replicated to backup cluster after
altering offsets.");
+ int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+ assertEquals(expectedRecordsTopic2,
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS,
backupTopic2).count(),
+ "New topic was not re-replicated to backup cluster after
altering offsets.");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Class<? extends Connector>[] connectorsToReset =
CONNECTOR_LIST.toArray(new Class[0]);
+ stopMirrorMakerConnectors(backup, connectorsToReset);
+ // Resetting the offsets for the heartbeat and checkpoint connectors
doesn't have any effect
+ // on their behavior, but users may want to wipe offsets from them to
prevent the offsets topic
+ // from growing infinitely. So, we include them in the list of
connectors to reset as a sanity check
+ // to make sure that this action can be performed successfully
+ resetAllMirrorMakerConnectorOffsets(backup, connectorsToReset);
+ resumeMirrorMakerConnectors(backup, connectorsToReset);
+
+ expectedRecordsTopic1 += NUM_RECORDS_PRODUCED;
+ assertEquals(expectedRecordsTopic1,
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS,
backupTopic1).count(),
+ "Records were not re-replicated to backup cluster after
resetting offsets.");
+ expectedRecordsTopic2 += NUM_RECORDS_PER_PARTITION;
+ assertEquals(expectedRecordsTopic2,
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS,
backupTopic2).count(),
+ "New topic was not re-replicated to backup cluster after
resetting offsets.");
+ }
+
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 65ecaa2b2c8..2e843cd6ec6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -51,6 +51,7 @@ import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -379,6 +380,9 @@ public class ConnectWorkerIntegrationTest {
CONNECTOR_NAME,
"Connector did not stop in time"
);
+ // If the connector is truly stopped, we should also see an empty set
of tasks and task configs
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyMap(),
connect.taskConfigs(CONNECTOR_NAME));
// Transition to RUNNING
connect.resumeConnector(CONNECTOR_NAME);
@@ -406,6 +410,8 @@ public class ConnectWorkerIntegrationTest {
CONNECTOR_NAME,
"Connector did not stop in time"
);
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyMap(),
connect.taskConfigs(CONNECTOR_NAME));
// Transition to PAUSED
connect.pauseConnector(CONNECTOR_NAME);
@@ -463,6 +469,9 @@ public class ConnectWorkerIntegrationTest {
CONNECTOR_NAME,
"Connector did not stop in time"
);
+ // If the connector is truly stopped, we should also see an empty set
of tasks and task configs
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyMap(),
connect.taskConfigs(CONNECTOR_NAME));
// Can resume a connector after its Connector has failed before
shutdown after receiving a stop request
props.remove("connector.start.inject.error");
@@ -483,6 +492,8 @@ public class ConnectWorkerIntegrationTest {
CONNECTOR_NAME,
"Connector did not stop in time"
);
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyMap(),
connect.taskConfigs(CONNECTOR_NAME));
// Can resume a connector after its Connector has failed during
shutdown after receiving a stop request
connect.resumeConnector(CONNECTOR_NAME);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index d0518566220..49dd1dbb245 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@@ -38,7 +37,6 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.junit.Assert.assertEquals;
/**
* A set of common assertions that can be applied to a Connect cluster during
integration testing
@@ -501,9 +499,6 @@ public class EmbeddedConnectClusterAssertions {
).orElse(false),
CONNECTOR_SHUTDOWN_DURATION_MS,
"At least the connector or one of its tasks is still
running");
- // If the connector is truly stopped, we should also see an empty
set of tasks and task configs
- assertEquals(Collections.emptyList(),
connect.connectorInfo(connectorName).tasks());
- assertEquals(Collections.emptyMap(),
connect.taskConfigs(connectorName));
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 7e68584237d..df8a2253531 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -89,7 +89,6 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static org.junit.Assert.assertFalse;
/**
* Setup an embedded Kafka cluster with specified number of brokers and
specified broker properties. To be used for
@@ -597,7 +596,9 @@ public class EmbeddedKafkaCluster {
Admin admin,
Collection<String> topics
) throws TimeoutException, InterruptedException, ExecutionException {
- assertFalse("collection of topics may not be empty", topics.isEmpty());
+ if (topics.isEmpty()) {
+ throw new AssertionError("collection of topics may not be empty");
+ }
return admin.describeTopics(topics)
.allTopicNames().get(maxDurationMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
@@ -617,7 +618,9 @@ public class EmbeddedKafkaCluster {
Admin admin,
Collection<TopicPartition> topicPartitions
) throws TimeoutException, InterruptedException, ExecutionException {
- assertFalse("collection of topic partitions may not be empty",
topicPartitions.isEmpty());
+ if (topicPartitions.isEmpty()) {
+ throw new AssertionError("collection of topic partitions may not
be empty");
+ }
Map<TopicPartition, OffsetSpec> offsetSpecMap =
topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp ->
OffsetSpec.latest()));
return admin.listOffsets(offsetSpecMap, new
ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED))
.all().get(maxDurationMs, TimeUnit.MILLISECONDS)