[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1269901272 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -597,7 +596,9 @@ private Set listPartitions( Admin admin, Collection topics ) throws TimeoutException, InterruptedException, ExecutionException { -assertFalse("collection of topics may not be empty", topics.isEmpty()); Review Comment: Did the same in `assertConnectorAndExactlyNumTasksAreRunning`. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map wrapped = new HashMap<>(); -wrapped.put("topic", topicPartition.topic()); -wrapped.put("partition", topicPartition.partition()); -wrapped.put("cluster", sourceClusterAlias); +wrapped.put(TOPIC_KEY, topicPartition.topic()); +wrapped.put(PARTITION_KEY, topicPartition.partition()); +wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } -static Map wrapOffset(long offset) { -return Collections.singletonMap("offset", offset); +public static Map wrapOffset(long offset) { +return Collections.singletonMap(OFFSET_KEY, offset); } -static TopicPartition unwrapPartition(Map wrapped) { -String topic = (String) wrapped.get("topic"); -int partition = (Integer) wrapped.get("partition"); +public static TopicPartition unwrapPartition(Map wrapped) { +String topic = (String) wrapped.get(TOPIC_KEY); +int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map wrapped) { -if (wrapped == null || wrapped.get("offset") == null) { +if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } -return (Long) wrapped.get("offset"); +return (Long) wrapped.get(OFFSET_KEY); +} + + +/** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null + * + * @throws ConnectException if the offset is invalid + */ +static void validateSourcePartitionString(Map sourcePartition, String key) { +Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + +if (!sourcePartition.containsKey(key)) +throw new ConnectException(String.format( +"Source partition %s is missing the '%s' key, which is required", +sourcePartition, +key +)); + +Object value = sourcePartition.get(key); +if (!(value instanceof String)) { +throw new ConnectException(String.format( +"Source partition %s has an invalid value %s for the '%s' key, which must be a string", +sourcePartition, +value, +key +)); +} +} + +/** + * Validate the {@link #PARTITION_KEY partition key} in a source partition that may be written to the offsets topic + * for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a non-negative integer. + * + * Note that the partition key most likely refers to a partition in a Kafka topic, whereas the term "source partition" refers + * to a {@link SourceRecord#sourcePartition() source partition} that is stored in a Kafka Connect worker's internal offsets + * topic (or, if running in standalone mode, offsets file). + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * + * @throws ConnectException if the offset is invalid + */ +static void validateSourcePartitionPartition(Map sourcePartition) { +Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + +if (!sourcePartition.containsKey(PARTITION_KEY)) +throw new ConnectException(String.format( +"Source partition %s is missing the '%s'
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1266848313 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +@SuppressWarnings({"unchecked", "rawtypes"}) +Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); +// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect +// on their behavior, but users may want to wipe offsets from them to prevent the offsets topic +// from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: Thanks Yash. I've pushed a change that allows arbitrary source partitions to be used with null source offsets; LMKWYT. I'll leave this conversation unresolved in case the next reviewer wants to weigh in; your points are definitely worth considering! ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1265649203 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +@SuppressWarnings({"unchecked", "rawtypes"}) +Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); +// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect +// on their behavior, but users may want to wipe offsets from them to prevent the offsets topic +// from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: > Since the set of source partitions is limited here, shouldn't log compaction be good enough? It's hard to define "limited" in this sense, but if you've configured MM2 to replicate every non-internal topic from a large cluster, then you could easily end up with hundreds of thousands of unique source offsets. And then, if some topics are deleted from the source cluster and other topics are created, we could go beyond even that. I think it'd be nice to allow people to do some cleanup in cases like that. > If these offsets are never read back and actually used, why would users want to "undo" partial or complete resets? I tried to touch on this with my earlier point: > This is especially relevant since, although the offsets topic isn't public API, its contents are now public API (via the `GET /connectors/{name}/offsets endpoint`), and users may want to track the offsets for these connectors for monitoring purposes TL;DR: The contents of the offsets topic may become an additional point of observability for the connectors for people to discover, e.g., the total set of replicated topic partitions or consumer groups. > Overall, I think I'm more in favor of only allowing standard use cases with the offsets management REST APIs to reduce the potential number of footguns for users. I definitely agree with the general mentality here of reducing the API surface in order to minimize the potential for users to hurt themselves. But is there any actual risk in the specific case of publishing tombstones for arbitrary topic partitions? In general I'd still like it if we could provide a fairly flexible API for these connectors, but if that's too risky, one alternative could be to only permit tombstones with validated source partitions. This would still allow for cleanup (with no distinction between total and partial resets), but wouldn't support an "undo" for total/partial resets, or removal of garbage source partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1262913499 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +@SuppressWarnings({"unchecked", "rawtypes"}) +Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); +// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect +// on their behavior, but users may want to wipe offsets from them to prevent the offsets topic +// from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: Heh, I was wondering if we'd have to do a deep dive on the rationale here. There are a few thoughts I had about what kind of offset tweaking to allow for the checkpoint and heartbeat connectors: - (As noted in the comment here) Users may want to tombstone offsets for these connectors to prevent the offsets topic from growing infinitely - We have no way of knowing if the set of offsets given to us is the total set or not, so we can't choose to only allow total resets instead of partial resets - Since partial resets become possible, it seems reasonable to allow users to "undo" a partial reset by re-submitting an offset for a given partition - This is especially relevant since, although the offsets topic isn't public API, its contents are now public API (via the `GET /connectors/{name}/offsets` endpoint), and users may want to track the offsets for these connectors for monitoring purposes - We may make use of these offsets in future versions of these connectors, at which point, it'll be nice to know that the only offsets that we can ever encounter are the ones that were either emitted by the connector, or at least match the format of ones emitted by the connector Also, I think the `UnsupportedOperationException` case was more intended for when offsets are managed externally, not for when it doesn't seem to make sense to modify them (because, e.g., they're never read back by the connector or its tasks). So with all this in mind, I figured it'd be best to allow offsets to be modified by users, but only if they match the format of the offsets that the connector's tasks already emit. Thoughts? As an aside... I'm wondering if we should abandon all kinds of validation when the proposed partition/offset pair is a tombstone. Just in case the offsets topic somehow ends up with garbage in it, it'd be nice to allow users to clean it up via the REST API, and if there's no existing partition/offset pair in the offsets topic for the given partition already, then emitting a tombstone effectively becomes a no-op. Thoughts x2? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1261630589 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -597,7 +596,9 @@ private Set listPartitions( Admin admin, Collection topics ) throws TimeoutException, InterruptedException, ExecutionException { -assertFalse("collection of topics may not be empty", topics.isEmpty()); Review Comment: Removed assertions from this API as it added an implicit dependency on JUnit 4, which we don't use in the `:connect:mirror` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org