yashmayya commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1263619000
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ########## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } + @Override + @Test + public void testReplication() throws Exception { + super.testReplication(); + + // Augment the base replication test case with some extra testing of the offset management + // API introduced in KIP-875 + // We do this only when exactly-once support is enabled in order to avoid having to worry about + // zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + + String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); + String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + + // Explicitly move back to offset 0 + // Note that the connector treats the offset as the last-consumed offset, + // so it will start reading the topic partition from offset 1 when it resumes + alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); + // Reset the offsets for test-topic-2 + resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); + resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + + int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); + assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), + "Records were not re-replicated to backup cluster after altering offsets."); + int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; + assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), + "New topic was not re-replicated to backup cluster after altering offsets."); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Class<? extends Connector>[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); + // Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect + // on their behavior, but users may want to wipe offsets from them to prevent the offsets topic + // from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: > (As noted in the comment here) Users may want to tombstone offsets for these connectors to prevent the offsets topic from growing infinitely Since the set of source partitions is limited here, shouldn't log compaction be good enough? > We have no way of knowing if the set of offsets given to us is the total set or not, so we can't choose to only allow total resets instead of partial resets Yeah, that makes sense. > Since partial resets become possible, it seems reasonable to allow users to "undo" a partial reset by re-submitting an offset for a given partition If these offsets are never read back and actually used, why would users want to "undo" partial or complete resets? > Also, I think the UnsupportedOperationException case was more intended for when offsets are managed externally, not for when it doesn't seem to make sense to modify them (because, e.g., they're never read back by the connector or its tasks). > > So with all this in mind, I figured it'd be best to allow offsets to be modified by users, but only if they match the format of the offsets that the connector's tasks already emit. Thoughts? Hm, I still feel like if offset modification isn't really doing anything here it might be more intuitive to simply disallow it rather than informing users that the offsets have been modified successfully but with no actual side-effect. However, I don't think too many users are going to be attempting to modify the offsets for `MirrorCheckpointConnector` / `MirrorHeartbeatConnector` and expecting any change in their behavior so I'm not too fussed about this either way. > As an aside... I'm wondering if we should abandon all kinds of validation when the proposed partition/offset pair is a tombstone. Just in case the offsets topic somehow ends up with garbage in it, it'd be nice to allow users to clean it up via the REST API, and if there's no existing partition/offset pair in the offsets topic for the given partition already, then emitting a tombstone effectively becomes a no-op Hm this is an interesting one. I think if the offsets topic ends up with garbage in it through an external producer, users could always clean it up using the same external producer. If we don't let users write garbage using the offsets REST API, maybe we shouldn't let them clean up garbage using the REST API either? Overall, I think I'm more in favor of only allowing standard use cases with the offsets management REST APIs to reduce the potential number of footguns for users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org