C0urante commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1262913499


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##########
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   Heh, I was wondering if we'd have to do a deep dive on the rationale here.
   
   There are a few thoughts I had about what kind of offset tweaking to allow 
for the checkpoint and heartbeat connectors:
   - (As noted in the comment here) Users may want to tombstone offsets for 
these connectors to prevent the offsets topic from growing infinitely
   - We have no way of knowing if the set of offsets given to us is the total 
set or not, so we can't choose to only allow total resets instead of partial 
resets
   - Since partial resets become possible, it seems reasonable to allow users 
to "undo" a partial reset by re-submitting an offset for a given partition
   - This is especially relevant since, although the offsets topic isn't public 
API, its contents are now public API (via the `GET /connectors/{name}/offsets` 
endpoint), and users may want to track the offsets for these connectors for 
monitoring purposes
   - We may make use of these offsets in future versions of these connectors, 
at which point, it'll be nice to know that the only offsets that we can ever 
encounter are the ones that were either emitted by the connector, or at least 
match the format of ones emitted by the connector
   
   Also, I think the `UnsupportedOperationException` case was more intended for 
when offsets are managed externally, not for when it doesn't seem to make sense 
to modify them (because, e.g., they're never read back by the connector or its 
tasks).
   
   So with all this in mind, I figured it'd be best to allow offsets to be 
modified by users, but only if they match the format of the offsets that the 
connector's tasks already emit. Thoughts?
   
   As an aside... I'm wondering if we should abandon all kinds of validation 
when the proposed partition/offset pair is a tombstone. Just in case the 
offsets topic somehow ends up with garbage in it, it'd be nice to allow users 
to clean it up via the REST API, and if there's no existing partition/offset 
pair in the offsets topic for the given partition already, then emitting a 
tombstone effectively becomes a no-op. Thoughts x2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to