yashmayya commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1266246256


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##########
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   > It's hard to define "limited" in this sense, but if you've configured MM2 
to replicate every non-internal topic from a large cluster, then you could 
easily end up with hundreds of thousands of unique source offsets. And then, if 
some topics are deleted from the source cluster and other topics are created, 
we could go beyond even that. I think it'd be nice to allow people to do some 
cleanup in cases like that.
   
   Thanks for the explanation, I think that makes sense.
   
   > TL;DR: The contents of the offsets topic may become an additional point of 
observability for the connectors for people to discover, e.g., the total set of 
replicated topic partitions or consumer groups.
   
   Ah, my bad, I did not clock that as a potential reason for the undo partial 
reset functionality. I agree that if we're deciding to allow tombstoning 
offsets for partitions, we should also allow the reverse operation.
   
   > I definitely agree with the general mentality here of reducing the API 
surface in order to minimize the potential for users to hurt themselves. But is 
there any actual risk in the specific case of publishing tombstones for 
arbitrary topic partitions?
   
   I don't think there's any real "risk" as such; more of a general concern on 
the flexibility - user friendliness tradeoff. Although thinking about this a 
bit more, a new / unfamiliar user can't really do any harm by accidentally 
writing a tombstone for an arbitrary partition (and an error response is also 
probably not gonna be terribly useful) so I think I'm okay with abandoning 
source partition validation when the source offset value is `null`. As an 
aside, my unfamiliarity with running or operating MM2 is making me averse to 
bikeshedding too much over this issue (FWICT these scenarios should be rare 
enough that any of the options we've discussed should be acceptable), so I'll 
defer to your expertise here 😄  



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##########
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   > It's hard to define "limited" in this sense, but if you've configured MM2 
to replicate every non-internal topic from a large cluster, then you could 
easily end up with hundreds of thousands of unique source offsets. And then, if 
some topics are deleted from the source cluster and other topics are created, 
we could go beyond even that. I think it'd be nice to allow people to do some 
cleanup in cases like that.
   
   Thanks for the explanation, I think that makes sense.
   
   > TL;DR: The contents of the offsets topic may become an additional point of 
observability for the connectors for people to discover, e.g., the total set of 
replicated topic partitions or consumer groups.
   
   Ah, my bad, I did not clock that as a potential reason for the undo partial 
reset functionality. I agree that if we're deciding to allow tombstoning 
offsets for partitions, we should also allow the reverse operation.
   
   > I definitely agree with the general mentality here of reducing the API 
surface in order to minimize the potential for users to hurt themselves. But is 
there any actual risk in the specific case of publishing tombstones for 
arbitrary topic partitions?
   
   I don't think there's any real "risk" as such; more of a general concern on 
the flexibility - user friendliness tradeoff. Although thinking about this a 
bit more, a new / unfamiliar user can't really do any harm by accidentally 
writing a tombstone for an arbitrary partition (and an error response is also 
probably not gonna be terribly useful) so I think I'm okay with abandoning 
source partition validation when the source offset value is `null`. As an 
aside, my unfamiliarity with running or operating MM2 is making me averse to 
bikeshedding too much over this issue (FWICT these scenarios should be rare 
enough that any of the options we've discussed should be acceptable), so I'll 
defer to your expertise here 😄  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to