yashmayya commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1262404995


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -132,6 +136,27 @@ public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {

Review Comment:
   What's the benefit of allowing users to modify the offsets here? It seems to 
me like it's essentially a no-op, so would it not be more appropriate to simply 
throw an `UnsupportedOperationException` with a relevant message?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java:
##########
@@ -85,6 +89,26 @@ public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> config, Map<Map<String, 
?>, Map<String, ?>> offsets) {

Review Comment:
   Same question as the `MirrorCheckpointConnector` case - why do we want to 
allow users to modify the offsets?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##########
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   Is this also the reason for not just throwing an 
`UnsupportedOperationException` in their `alterOffsets` methods? If so, we 
don't really need validation logic there right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to