This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de409b389d2 KAFKA-15177: Implement KIP-875 
SourceConnector::alterOffset API in MirrorMaker 2 connectors (#14005)
de409b389d2 is described below

commit de409b389d26f7681fba8583db2b96584258aa48
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Aug 17 09:33:59 2023 -0400

    KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in 
MirrorMaker 2 connectors (#14005)
    
    Reviewers: Yash Mayya <[email protected]>, Greg Harris 
<[email protected]>
---
 .../connect/mirror/MirrorCheckpointConnector.java  |  31 ++++
 .../connect/mirror/MirrorHeartbeatConnector.java   |  30 ++++
 .../connect/mirror/MirrorSourceConnector.java      |  29 ++++
 .../apache/kafka/connect/mirror/MirrorUtils.java   | 173 +++++++++++++++++++--
 .../mirror/MirrorCheckpointConnectorTest.java      | 145 +++++++++++++++++
 .../mirror/MirrorHeartBeatConnectorTest.java       | 138 ++++++++++++++++
 .../connect/mirror/MirrorSourceConnectorTest.java  | 141 +++++++++++++++++
 .../MirrorConnectorsIntegrationBaseTest.java       | 102 +++++++++++-
 ...MirrorConnectorsIntegrationExactlyOnceTest.java |  52 +++++++
 .../integration/ConnectWorkerIntegrationTest.java  |  11 ++
 .../clusters/EmbeddedConnectClusterAssertions.java |   5 -
 .../util/clusters/EmbeddedKafkaCluster.java        |   9 +-
 12 files changed, 844 insertions(+), 22 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 07e7b49a44e..1a146dcf5ac 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
 import org.slf4j.Logger;
@@ -40,6 +41,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
+
 /** Replicate consumer group state between clusters. Emits checkpoint records.
  *
  *  @see MirrorCheckpointConfig for supported config properties.
@@ -132,6 +136,33 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry : 
offsets.entrySet()) {
+            Map<String, ?> sourceOffset = offsetEntry.getValue();
+            if (sourceOffset == null) {
+                // We allow tombstones for anything; if there's garbage in the 
offsets for the connector, we don't
+                // want to prevent users from being able to clean it up using 
the REST API
+                continue;
+            }
+
+            Map<String, ?> sourcePartition = offsetEntry.getKey();
+            if (sourcePartition == null) {
+                throw new ConnectException("Source partitions may not be 
null");
+            }
+
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
CONSUMER_GROUP_ID_KEY);
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
TOPIC_KEY);
+            MirrorUtils.validateSourcePartitionPartition(sourcePartition);
+
+            MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset, 
true);
+        }
+
+        // We don't actually use these offsets in the task class, so no 
additional effort is required beyond just validating
+        // the format of the user-supplied offsets
+        return true;
+    }
+
     private void refreshConsumerGroups()
             throws InterruptedException, ExecutionException {
         Set<String> consumerGroups = findConsumerGroups();
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
index 6410e8fc3f9..6ab9fce31be 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -27,6 +28,9 @@ import java.util.Map;
 import java.util.List;
 import java.util.Collections;
 
+import static 
org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY;
+import static 
org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY;
+
 /** Emits heartbeats to Kafka.
  *
  *  @see MirrorHeartbeatConfig for supported config properties.
@@ -85,6 +89,32 @@ public class MirrorHeartbeatConnector extends 
SourceConnector {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> config, Map<Map<String, 
?>, Map<String, ?>> offsets) {
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry : 
offsets.entrySet()) {
+            Map<String, ?> sourceOffset = offsetEntry.getValue();
+            if (sourceOffset == null) {
+                // We allow tombstones for anything; if there's garbage in the 
offsets for the connector, we don't
+                // want to prevent users from being able to clean it up using 
the REST API
+                continue;
+            }
+
+            Map<String, ?> sourcePartition = offsetEntry.getKey();
+            if (sourcePartition == null) {
+                throw new ConnectException("Source partitions may not be 
null");
+            }
+
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
SOURCE_CLUSTER_ALIAS_KEY);
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
TARGET_CLUSTER_ALIAS_KEY);
+
+            MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset, 
true);
+        }
+
+        // We don't actually use these offsets in the task class, so no 
additional effort is required beyond just validating
+        // the format of the user-supplied offsets
+        return true;
+    }
+
     private void createInternalTopics() {
         MirrorUtils.createSinglePartitionCompactedTopic(
                 config.heartbeatsTopic(),
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 5c45d9ed045..74485bd7b52 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -75,6 +75,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
+import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
 
 /** Replicate data, configuration, and ACLs between clusters.
  *
@@ -268,6 +270,33 @@ public class MirrorSourceConnector extends SourceConnector 
{
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry : 
offsets.entrySet()) {
+            Map<String, ?> sourceOffset = offsetEntry.getValue();
+            if (sourceOffset == null) {
+                // We allow tombstones for anything; if there's garbage in the 
offsets for the connector, we don't
+                // want to prevent users from being able to clean it up using 
the REST API
+                continue;
+            }
+
+            Map<String, ?> sourcePartition = offsetEntry.getKey();
+            if (sourcePartition == null) {
+                throw new ConnectException("Source partitions may not be 
null");
+            }
+
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
SOURCE_CLUSTER_KEY);
+            MirrorUtils.validateSourcePartitionString(sourcePartition, 
TOPIC_KEY);
+            MirrorUtils.validateSourcePartitionPartition(sourcePartition);
+
+            MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset, 
false);
+        }
+
+        // We never commit offsets with our source consumer, so no additional 
effort is required beyond just validating
+        // the format of the user-supplied offsets
+        return true;
+    }
+
     private boolean consumerUsesReadCommitted(Map<String, String> props) {
         Object consumerIsolationLevel = 
MirrorSourceConfig.sourceConsumerConfig(props)
                 .get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index eb6bdeebb9e..9993a4331cc 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,13 +40,19 @@ import java.util.Map;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 
 /** Internal utility methods. */
-final class MirrorUtils {
+public final class MirrorUtils {
+
+    public static final String SOURCE_CLUSTER_KEY = "cluster";
+    public static final String TOPIC_KEY = "topic";
+    public static final String PARTITION_KEY = "partition";
+    public static final String OFFSET_KEY = "offset";
     private static final Logger log = 
LoggerFactory.getLogger(MirrorUtils.class);
 
     // utility class
@@ -65,27 +72,171 @@ final class MirrorUtils {
 
     static Map<String, Object> wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
         Map<String, Object> wrapped = new HashMap<>();
-        wrapped.put("topic", topicPartition.topic());
-        wrapped.put("partition", topicPartition.partition());
-        wrapped.put("cluster", sourceClusterAlias);
+        wrapped.put(TOPIC_KEY, topicPartition.topic());
+        wrapped.put(PARTITION_KEY, topicPartition.partition());
+        wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
         return wrapped;
     }
 
-    static Map<String, Object> wrapOffset(long offset) {
-        return Collections.singletonMap("offset", offset);
+    public static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap(OFFSET_KEY, offset);
     }
 
-    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
-        String topic = (String) wrapped.get("topic");
-        int partition = (Integer) wrapped.get("partition");
+    public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get(TOPIC_KEY);
+        int partition = (Integer) wrapped.get(PARTITION_KEY);
         return new TopicPartition(topic, partition);
     }
 
     static Long unwrapOffset(Map<String, ?> wrapped) {
-        if (wrapped == null || wrapped.get("offset") == null) {
+        if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
             return -1L;
         }
-        return (Long) wrapped.get("offset");
+        return (Long) wrapped.get(OFFSET_KEY);
+    }
+
+
+    /**
+     * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a string.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     * @param key the key to check for in the source partition; may not be null
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourcePartitionString(Map<String, ?> sourcePartition, 
String key) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+        Objects.requireNonNull(key, "Key may not be null");
+
+        if (!sourcePartition.containsKey(key))
+            throw new ConnectException(String.format(
+                    "Source partition %s is missing the '%s' key, which is 
required",
+                    sourcePartition,
+                    key
+            ));
+
+        Object value = sourcePartition.get(key);
+        if (!(value instanceof String)) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which must be a string",
+                    sourcePartition,
+                    value,
+                    key
+            ));
+        }
+    }
+
+    /**
+     * Validate the {@link #PARTITION_KEY partition key} in a source partition 
that may be written to the offsets topic
+     * for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a non-negative integer.
+     * <p/>
+     * Note that the partition key most likely refers to a partition in a 
Kafka topic, whereas the term "source partition" refers
+     * to a {@link SourceRecord#sourcePartition() source partition} that is 
stored in a Kafka Connect worker's internal offsets
+     * topic (or, if running in standalone mode, offsets file).
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     *
+     * @throws ConnectException if the partition is invalid
+     */
+    static void validateSourcePartitionPartition(Map<String, ?> 
sourcePartition) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+        if (!sourcePartition.containsKey(PARTITION_KEY))
+            throw new ConnectException(String.format(
+                    "Source partition %s is missing the '%s' key, which is 
required",
+                    sourcePartition,
+                    PARTITION_KEY
+            ));
+
+        Object value = sourcePartition.get(PARTITION_KEY);
+        // The value may be encoded as a long but as long as it fits inside a 
32-bit integer, that's fine
+        if (!(value instanceof Integer || value instanceof Long) || ((Number) 
value).longValue() > Integer.MAX_VALUE) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which must be an integer",
+                    sourcePartition,
+                    value,
+                    PARTITION_KEY
+            ));
+        }
+
+        if (((Number) value).intValue() < 0) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which cannot be negative",
+                    sourcePartition,
+                    value,
+                    PARTITION_KEY
+            ));
+        }
+    }
+
+    /**
+     * Validate a source offset that may be written to the offsets topic for 
one of the MM2 connectors.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourceOffset()
+     *
+     * @param sourcePartition the corresponding {@link 
SourceRecord#sourcePartition() source partition} for the offset;
+     *                        may not be null
+     * @param sourceOffset the to-be-validated source offset; may be null 
(which is considered valid)
+     * @param onlyOffsetZero whether the "offset" value in the source offset 
map must be zero;
+     *                       if {@code true}, then only zero is permitted; if 
{@code false}, then any non-negative
+     *                       value is permitted
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourceOffset(Map<String, ?> sourcePartition, 
Map<String, ?> sourceOffset, boolean onlyOffsetZero) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+        if (sourceOffset == null) {
+            return;
+        }
+
+        if (!sourceOffset.containsKey(OFFSET_KEY)) {
+            throw new ConnectException(String.format(
+                    "Source offset %s for source partition %s is missing the 
'%s' key, which is required",
+                    sourceOffset,
+                    sourcePartition,
+                    OFFSET_KEY
+            ));
+        }
+
+        Object offset = sourceOffset.get(OFFSET_KEY);
+        if (!(offset instanceof Integer || offset instanceof Long)) {
+            throw new ConnectException(String.format(
+                    "Source offset %s for source partition %s has an invalid 
value %s for the '%s' key, which must be an integer",
+                    sourceOffset,
+                    sourcePartition,
+                    offset,
+                    OFFSET_KEY
+            ));
+        }
+
+        long offsetValue = ((Number) offset).longValue();
+        if (onlyOffsetZero && offsetValue != 0) {
+            throw new ConnectException(String.format(
+                    "Source offset %s for source partition %s has an invalid 
value %s for the '%s' key; the only accepted value is 0",
+                    sourceOffset,
+                    sourcePartition,
+                    offset,
+                    OFFSET_KEY
+            ));
+        } else if (!onlyOffsetZero && offsetValue < 0) {
+            throw new ConnectException(String.format(
+                    "Source offset %s for source partition %s has an invalid 
value %s for the '%s' key, which cannot be negative",
+                    sourceOffset,
+                    sourcePartition,
+                    offset,
+                    OFFSET_KEY
+            ));
+        }
     }
 
     static TopicPartition decodeTopicPartition(String topicPartitionString) {
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 54818029150..ecc9fcbc11f 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -29,10 +30,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.PARTITION_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
 import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -41,6 +48,7 @@ import static org.mockito.Mockito.spy;
 public class MirrorCheckpointConnectorTest {
 
     private static final String CONSUMER_GROUP = "consumer-group-1";
+    private static final Map<String, ?> SOURCE_OFFSET = 
MirrorUtils.wrapOffset(0);
 
     @Test
     public void testMirrorCheckpointConnectorDisabled() {
@@ -183,4 +191,141 @@ public class MirrorCheckpointConnectorTest {
         assertEquals(groupFound, verifiedSet);
     }
 
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                Collections.singletonMap("unused_partition_key", 
"unused_partition_value"),
+                SOURCE_OFFSET
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                null,
+                SOURCE_OFFSET
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMissingPartitionKey() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(null, Collections.singletonMap(
+                partition,
+                SOURCE_OFFSET
+        ));
+
+        Map<String, ?> validPartition = sourcePartition("consumer-app-1", "t", 
3);
+        // Sanity check to make sure our valid partition is actually valid
+        assertTrue(alterOffsets.apply(validPartition));
+
+        for (String key : Arrays.asList(CONSUMER_GROUP_ID_KEY, TOPIC_KEY, 
PARTITION_KEY)) {
+            Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+            invalidPartition.remove(key);
+            assertThrows(ConnectException.class, () -> 
alterOffsets.apply(invalidPartition));
+        }
+    }
+
+    @Test
+    public void testAlterOffsetsInvalidPartitionPartition() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+        Map<String, Object> partition = sourcePartition("consumer-app-2", "t", 
3);
+        partition.put(PARTITION_KEY, "a string");
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                partition,
+                SOURCE_OFFSET
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Map<String, ?> partition1 = sourcePartition("consumer-app-3", "t1", 0);
+        Map<String, ?> partition2 = sourcePartition("consumer-app-4", "t1", 1);
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(partition1, SOURCE_OFFSET);
+        offsets.put(partition2, SOURCE_OFFSET);
+
+        assertTrue(connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectOffsetKey() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("consumer-app-5", "t1", 2),
+                Collections.singletonMap("unused_offset_key", 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsOffsetValues() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Function<Object, Boolean> alterOffsets = offset -> 
connector.alterOffsets(null, Collections.singletonMap(
+                sourcePartition("consumer-app-6", "t", 5),
+                Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+        ));
+
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(new 
Object()));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(((long) 
Integer.MAX_VALUE) + 1));
+        assertTrue(() -> alterOffsets.apply(0));
+    }
+
+    @Test
+    public void testSuccessfulAlterOffsets() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("consumer-app-7", "t2", 0),
+                SOURCE_OFFSET
+        );
+
+        // Expect no exception to be thrown when a valid offsets map is 
passed. An empty offsets map is treated as valid
+        // since it could indicate that the offsets were reset previously or 
that no offsets have been committed yet
+        // (for a reset operation)
+        assertTrue(connector.alterOffsets(null, offsets));
+        assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+    }
+
+    @Test
+    public void testAlterOffsetsTombstones() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(
+                null,
+                Collections.singletonMap(partition, null)
+        );
+
+        Map<String, Object> partition = sourcePartition("consumer-app-2", "t", 
3);
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.put(PARTITION_KEY, "a string");
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.remove(PARTITION_KEY);
+        assertTrue(() -> alterOffsets.apply(partition));
+
+        assertTrue(() -> alterOffsets.apply(null));
+        assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+        assertTrue(() -> 
alterOffsets.apply(Collections.singletonMap("unused_partition_key", 
"unused_partition_value")));
+    }
+
+    private static Map<String, Object> sourcePartition(String consumerGroupId, 
String topic, int partition) {
+        Map<String, Object> result = new HashMap<>();
+        result.put(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+        result.put(TOPIC_KEY, topic);
+        result.put(PARTITION_KEY, partition);
+        return result;
+    }
+
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
index b651fac73ed..0248e487c18 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
@@ -16,14 +16,27 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import static 
org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY;
+import static 
org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY;
 import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
 
 public class MirrorHeartBeatConnectorTest {
 
+    private static final Map<String, ?> SOURCE_OFFSET = 
MirrorUtils.wrapOffset(0);
+
     @Test
     public void testMirrorHeartbeatConnectorDisabled() {
         // disable the heartbeat emission
@@ -49,4 +62,129 @@ public class MirrorHeartBeatConnectorTest {
         // expect one task will be created, even the replication is disabled
         assertEquals(1, output.size(), "Task should have been created even 
with replication disabled");
     }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                Collections.singletonMap("unused_partition_key", 
"unused_partition_value"),
+                SOURCE_OFFSET
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                null,
+                SOURCE_OFFSET
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMissingPartitionKey() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(null, Collections.singletonMap(
+                partition,
+                SOURCE_OFFSET
+        ));
+
+        Map<String, ?> validPartition = sourcePartition("primary", "backup");
+        // Sanity check to make sure our valid partition is actually valid
+        assertTrue(alterOffsets.apply(validPartition));
+
+        for (String key : Arrays.asList(SOURCE_CLUSTER_ALIAS_KEY, 
TARGET_CLUSTER_ALIAS_KEY)) {
+            Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+            invalidPartition.remove(key);
+            assertThrows(ConnectException.class, () -> 
alterOffsets.apply(invalidPartition));
+        }
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Map<String, ?> partition1 = sourcePartition("primary", "backup");
+        Map<String, ?> partition2 = sourcePartition("backup", "primary");
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(partition1, SOURCE_OFFSET);
+        offsets.put(partition2, SOURCE_OFFSET);
+
+        assertTrue(connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectOffsetKey() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("primary", "backup"),
+                Collections.singletonMap("unused_offset_key", 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsOffsetValues() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Function<Object, Boolean> alterOffsets = offset -> 
connector.alterOffsets(null, Collections.singletonMap(
+                sourcePartition("primary", "backup"),
+                Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+        ));
+
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(new 
Object()));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(((long) 
Integer.MAX_VALUE) + 1));
+        assertTrue(() -> alterOffsets.apply(0));
+    }
+
+    @Test
+    public void testSuccessfulAlterOffsets() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("primary", "backup"),
+                SOURCE_OFFSET
+        );
+
+        // Expect no exception to be thrown when a valid offsets map is 
passed. An empty offsets map is treated as valid
+        // since it could indicate that the offsets were reset previously or 
that no offsets have been committed yet
+        // (for a reset operation)
+        assertTrue(connector.alterOffsets(null, offsets));
+        assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+    }
+
+    @Test
+    public void testAlterOffsetsTombstones() {
+        MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(
+                null,
+                Collections.singletonMap(partition, null)
+        );
+
+        Map<String, Object> partition = sourcePartition("src", "bak");
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.put(SOURCE_CLUSTER_ALIAS_KEY, 618);
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.remove(SOURCE_CLUSTER_ALIAS_KEY);
+        assertTrue(() -> alterOffsets.apply(partition));
+
+        assertTrue(() -> alterOffsets.apply(null));
+        assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+        assertTrue(() -> 
alterOffsets.apply(Collections.singletonMap("unused_partition_key", 
"unused_partition_value")));
+    }
+
+    private static Map<String, Object> sourcePartition(String 
sourceClusterAlias, String targetClusterAlias) {
+        Map<String, Object> result = new HashMap<>();
+        result.put(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+        result.put(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+        return result;
+    }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 5e626679e9d..bc56d3ee8ea 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -48,8 +48,12 @@ import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLI
 import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
 import static 
org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_LAG_MAX;
 import static 
org.apache.kafka.connect.mirror.MirrorSourceConfig.TASK_TOPIC_PARTITIONS;
+import static org.apache.kafka.connect.mirror.MirrorUtils.PARTITION_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
+import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
 import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -78,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class MirrorSourceConnectorTest {
@@ -683,4 +688,140 @@ public class MirrorSourceConnectorTest {
         assertNotNull(result, "Connector should not have record null config 
value for '" + name + "' property");
         return Optional.of(result);
     }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                Collections.singletonMap("unused_partition_key", 
"unused_partition_value"),
+                MirrorUtils.wrapOffset(10)
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                null,
+                MirrorUtils.wrapOffset(10)
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMissingPartitionKey() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(null, Collections.singletonMap(
+                partition,
+                MirrorUtils.wrapOffset(64)
+        ));
+
+        Map<String, ?> validPartition = sourcePartition("t", 3, "us-east-2");
+        // Sanity check to make sure our valid partition is actually valid
+        assertTrue(alterOffsets.apply(validPartition));
+
+        for (String key : Arrays.asList(SOURCE_CLUSTER_KEY, TOPIC_KEY, 
PARTITION_KEY)) {
+            Map<String, ?> invalidPartition = new HashMap<>(validPartition);
+            invalidPartition.remove(key);
+            assertThrows(ConnectException.class, () -> 
alterOffsets.apply(invalidPartition));
+        }
+    }
+
+    @Test
+    public void testAlterOffsetsInvalidPartitionPartition() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+        Map<String, Object> partition = sourcePartition("t", 3, "us-west-2");
+        partition.put(PARTITION_KEY, "a string");
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, Collections.singletonMap(
+                partition,
+                MirrorUtils.wrapOffset(49)
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+
+        Map<String, ?> partition1 = sourcePartition("t1", 0, "primary");
+        Map<String, ?> partition2 = sourcePartition("t1", 1, "primary");
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(partition1, MirrorUtils.wrapOffset(50));
+        offsets.put(partition2, MirrorUtils.wrapOffset(100));
+
+        assertTrue(connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectOffsetKey() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("t1", 2, "backup"),
+                Collections.singletonMap("unused_offset_key", 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(null, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsOffsetValues() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+
+        Function<Object, Boolean> alterOffsets = offset -> 
connector.alterOffsets(null, Collections.singletonMap(
+                sourcePartition("t", 5, "backup"),
+                Collections.singletonMap(MirrorUtils.OFFSET_KEY, offset)
+        ));
+
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(new 
Object()));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
+        assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
+        assertTrue(() -> alterOffsets.apply(0));
+        assertTrue(() -> alterOffsets.apply(10));
+        assertTrue(() -> alterOffsets.apply(((long) Integer.MAX_VALUE) + 1));
+    }
+
+    @Test
+    public void testSuccessfulAlterOffsets() {
+        MirrorSourceConnector connector = new MirrorSourceConnector();
+
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                sourcePartition("t2", 0, "backup"),
+                MirrorUtils.wrapOffset(5)
+        );
+
+        // Expect no exception to be thrown when a valid offsets map is 
passed. An empty offsets map is treated as valid
+        // since it could indicate that the offsets were reset previously or 
that no offsets have been committed yet
+        // (for a reset operation)
+        assertTrue(connector.alterOffsets(null, offsets));
+        assertTrue(connector.alterOffsets(null, Collections.emptyMap()));
+    }
+
+    @Test
+    public void testAlterOffsetsTombstones() {
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
+
+        Function<Map<String, ?>, Boolean> alterOffsets = partition -> 
connector.alterOffsets(
+                null,
+                Collections.singletonMap(partition, null)
+        );
+
+        Map<String, Object> partition = sourcePartition("kips", 875, 
"apache.kafka");
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.put(PARTITION_KEY, "a string");
+        assertTrue(() -> alterOffsets.apply(partition));
+        partition.remove(PARTITION_KEY);
+        assertTrue(() -> alterOffsets.apply(partition));
+
+        assertTrue(() -> alterOffsets.apply(null));
+        assertTrue(() -> alterOffsets.apply(Collections.emptyMap()));
+        assertTrue(() -> 
alterOffsets.apply(Collections.singletonMap("unused_partition_key", 
"unused_partition_value")));
+    }
+
+    private static Map<String, Object> sourcePartition(String topic, int 
partition, String sourceClusterAlias) {
+        return MirrorUtils.wrapPartition(
+                new TopicPartition(topic, partition),
+                sourceClusterAlias
+        );
+    }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index bc5e0212bbf..4beeaaf876a 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -44,10 +44,13 @@ import org.apache.kafka.connect.mirror.MirrorClient;
 import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
 import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.MirrorUtils;
 import org.apache.kafka.connect.mirror.SourceAndTarget;
 import org.apache.kafka.connect.mirror.Checkpoint;
 import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
 import org.apache.kafka.connect.mirror.TestUtils;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -56,10 +59,12 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -69,6 +74,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
 import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Tag;
@@ -375,7 +381,6 @@ public class MirrorConnectorsIntegrationBaseTest {
                 assertTrue(primaryConsumer.position(
                         new TopicPartition(reverseTopic1, 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
             }
-
         }
 
         // create more matching topics
@@ -406,7 +411,7 @@ public class MirrorConnectorsIntegrationBaseTest {
                     "New topic was not replicated to primary cluster.");
         }
     }
-    
+
     @Test
     public void testReplicationWithEmptyPartition() throws Exception {
         String consumerGroupName = 
"consumer-group-testReplicationWithEmptyPartition";
@@ -933,12 +938,103 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    @SafeVarargs
+    protected static void stopMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.stopConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            connectCluster.assertions().assertConnectorIsStopped(
+                    connectorName,
+                    "Connector did not stop in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {
+        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+        String connectorName = MirrorSourceConnector.class.getSimpleName();
+
+        ConnectorOffsets currentOffsets = 
connectCluster.connectorOffsets(connectorName);
+        List<ConnectorOffset> alteredOffsetContents = 
currentOffsets.offsets().stream()
+                .map(connectorOffset -> {
+                    TopicPartition topicPartition = 
MirrorUtils.unwrapPartition(connectorOffset.partition());
+                    if (!topicsSet.contains(topicPartition.topic())) {
+                        return null;
+                    }
+
+                    Object currentOffsetObject = 
connectorOffset.offset().get(MirrorUtils.OFFSET_KEY);
+                    if (!(currentOffsetObject instanceof Integer || 
currentOffsetObject instanceof Long)) {
+                        throw new AssertionError("Unexpected type for offset 
'" + currentOffsetObject + "'; should be integer or long");
+                    }
+
+                    long currentOffset = ((Number) 
currentOffsetObject).longValue();
+                    long alteredOffset = 
alterOffset.applyAsLong(currentOffset);
+
+                    return new ConnectorOffset(
+                            connectorOffset.partition(),
+                            MirrorUtils.wrapOffset(alteredOffset)
+                    );
+                }).filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        connectCluster.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(alteredOffsetContents));
+    }
+
+    protected static void 
resetSomeMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster 
connectCluster, String... topics) {
+        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+        String connectorName = MirrorSourceConnector.class.getSimpleName();
+
+        ConnectorOffsets currentOffsets = 
connectCluster.connectorOffsets(connectorName);
+        List<ConnectorOffset> alteredOffsetContents = 
currentOffsets.offsets().stream()
+                .map(connectorOffset -> {
+                    TopicPartition topicPartition = 
MirrorUtils.unwrapPartition(connectorOffset.partition());
+                    if (!topicsSet.contains(topicPartition.topic())) {
+                        return null;
+                    }
+
+                    return new ConnectorOffset(connectorOffset.partition(), 
null);
+                }).filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        connectCluster.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(alteredOffsetContents));
+    }
+
+    @SafeVarargs
+    protected static void 
resetAllMirrorMakerConnectorOffsets(EmbeddedConnectCluster connectCluster, 
Class<? extends Connector>... connectorClasses) {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            connectCluster.resetConnectorOffsets(connectorName);
+            assertEquals(
+                    Collections.emptyList(),
+                    connectCluster.connectorOffsets(connectorName).offsets(),
+                    "Offsets for connector should be completely empty after 
full reset"
+            );
+        }
+    }
+
     /*
      * wait for the topic created on the cluster
      */
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
index 0081dcb3688..36e7b34dd54 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -16,13 +16,18 @@
  */
 package org.apache.kafka.connect.mirror.integration;
 
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Properties;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 /**
  * Tests MM2 replication with exactly-once support enabled on the Connect 
clusters.
  */
@@ -46,4 +51,51 @@ public class MirrorConnectorsIntegrationExactlyOnceTest 
extends MirrorConnectors
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        stopMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        stopMirrorMakerConnectors(backup, connectorsToReset);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check
+        // to make sure that this action can be performed successfully
+        resetAllMirrorMakerConnectorOffsets(backup, connectorsToReset);
+        resumeMirrorMakerConnectors(backup, connectorsToReset);
+
+        expectedRecordsTopic1 += NUM_RECORDS_PRODUCED;
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
resetting offsets.");
+        expectedRecordsTopic2 += NUM_RECORDS_PER_PARTITION;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
resetting offsets.");
+    }
+
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 65ecaa2b2c8..2e843cd6ec6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -51,6 +51,7 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -379,6 +380,9 @@ public class ConnectWorkerIntegrationTest {
                 CONNECTOR_NAME,
                 "Connector did not stop in time"
         );
+        // If the connector is truly stopped, we should also see an empty set 
of tasks and task configs
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Transition to RUNNING
         connect.resumeConnector(CONNECTOR_NAME);
@@ -406,6 +410,8 @@ public class ConnectWorkerIntegrationTest {
                 CONNECTOR_NAME,
                 "Connector did not stop in time"
         );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Transition to PAUSED
         connect.pauseConnector(CONNECTOR_NAME);
@@ -463,6 +469,9 @@ public class ConnectWorkerIntegrationTest {
                 CONNECTOR_NAME,
                 "Connector did not stop in time"
         );
+        // If the connector is truly stopped, we should also see an empty set 
of tasks and task configs
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Can resume a connector after its Connector has failed before 
shutdown after receiving a stop request
         props.remove("connector.start.inject.error");
@@ -483,6 +492,8 @@ public class ConnectWorkerIntegrationTest {
                 CONNECTOR_NAME,
                 "Connector did not stop in time"
         );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Can resume a connector after its Connector has failed during 
shutdown after receiving a stop request
         connect.resumeConnector(CONNECTOR_NAME);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index d0518566220..49dd1dbb245 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.core.Response;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -38,7 +37,6 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.junit.Assert.assertEquals;
 
 /**
  * A set of common assertions that can be applied to a Connect cluster during 
integration testing
@@ -501,9 +499,6 @@ public class EmbeddedConnectClusterAssertions {
                     ).orElse(false),
                     CONNECTOR_SHUTDOWN_DURATION_MS,
                     "At least the connector or one of its tasks is still 
running");
-            // If the connector is truly stopped, we should also see an empty 
set of tasks and task configs
-            assertEquals(Collections.emptyList(), 
connect.connectorInfo(connectorName).tasks());
-            assertEquals(Collections.emptyMap(), 
connect.taskConfigs(connectorName));
         } catch (AssertionError e) {
             throw new AssertionError(detailMessage, e);
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 7e68584237d..df8a2253531 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -89,7 +89,6 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static org.junit.Assert.assertFalse;
 
 /**
  * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
@@ -597,7 +596,9 @@ public class EmbeddedKafkaCluster {
             Admin admin,
             Collection<String> topics
     ) throws TimeoutException, InterruptedException, ExecutionException {
-        assertFalse("collection of topics may not be empty", topics.isEmpty());
+        if (topics.isEmpty()) {
+            throw new AssertionError("collection of topics may not be empty");
+        }
         return admin.describeTopics(topics)
                 .allTopicNames().get(maxDurationMs, TimeUnit.MILLISECONDS)
                 .entrySet().stream()
@@ -617,7 +618,9 @@ public class EmbeddedKafkaCluster {
             Admin admin,
             Collection<TopicPartition> topicPartitions
     ) throws TimeoutException, InterruptedException, ExecutionException {
-        assertFalse("collection of topic partitions may not be empty", 
topicPartitions.isEmpty());
+        if (topicPartitions.isEmpty()) {
+            throw new AssertionError("collection of topic partitions may not 
be empty");
+        }
         Map<TopicPartition, OffsetSpec> offsetSpecMap = 
topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest()));
         return admin.listOffsets(offsetSpecMap, new 
ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED))
                 .all().get(maxDurationMs, TimeUnit.MILLISECONDS)

Reply via email to