yashmayya commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1268520364
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ########## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } + @Override + @Test + public void testReplication() throws Exception { + super.testReplication(); + + // Augment the base replication test case with some extra testing of the offset management + // API introduced in KIP-875 + // We do this only when exactly-once support is enabled in order to avoid having to worry about + // zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + + String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); + String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + + // Explicitly move back to offset 0 + // Note that the connector treats the offset as the last-consumed offset, + // so it will start reading the topic partition from offset 1 when it resumes + alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); + // Reset the offsets for test-topic-2 + resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); + resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + + int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); + assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), + "Records were not re-replicated to backup cluster after altering offsets."); + int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; + assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), + "New topic was not re-replicated to backup cluster after altering offsets."); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Class<? extends Connector>[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); + // Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect + // on their behavior, but users may want to wipe offsets from them to prevent the offsets topic + // from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: > I've pushed a change that allows arbitrary source partitions to be used with null source offsets; LMKWYT. Thanks Chris, looks good. We can probably update the `FileStreamSourceConnector::alterOffsets` method with a similar change in a follow up as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org