Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
hachikuji merged PR #15385: URL: https://github.com/apache/kafka/pull/15385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on code in PR #15385: URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart)) Mockito.reset(mockListener); } +/** + * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e. + * snapshot & cluster contain the relevant updates. + */ +@Test +public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException { +Time time = new MockTime(); +metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); + +// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100. +int oldNodeCount = 10; +String topic1 = "test_topic1"; +String topic2 = "test_topic2"; +TopicPartition topic1Part0 = new TopicPartition(topic1, 0); +Map topicPartitionCounts = new HashMap<>(); +int oldPartitionCount = 1; +topicPartitionCounts.put(topic1, oldPartitionCount); +topicPartitionCounts.put(topic2, oldPartitionCount); +Map topicIds = new HashMap<>(); +topicIds.put(topic1, Uuid.randomUuid()); +topicIds.put(topic2, Uuid.randomUuid()); +int oldLeaderEpoch = 100; +MetadataResponse metadataResponse = +RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot(); +Cluster cluster = metadata.fetch(); +// Validate metadata snapshot & cluster are setup as expected. +assertEquals(cluster, snapshot.cluster()); +assertEquals(oldNodeCount, snapshot.cluster().nodes().size()); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1)); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2)); +assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0)); + +// Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster. +// Metadata will be updated with higher # of nodes, partition-counts, leader-epoch. +int numThreads = 6; +ExecutorService service = Executors.newFixedThreadPool(numThreads); Review Comment: done at the end of the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on code in PR #15385: URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart)) Mockito.reset(mockListener); } +/** + * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e. + * snapshot & cluster contain the relevant updates. + */ +@Test +public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException { +Time time = new MockTime(); +metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); + +// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100. +int oldNodeCount = 10; +String topic1 = "test_topic1"; +String topic2 = "test_topic2"; +TopicPartition topic1Part0 = new TopicPartition(topic1, 0); +Map topicPartitionCounts = new HashMap<>(); +int oldPartitionCount = 1; +topicPartitionCounts.put(topic1, oldPartitionCount); +topicPartitionCounts.put(topic2, oldPartitionCount); +Map topicIds = new HashMap<>(); +topicIds.put(topic1, Uuid.randomUuid()); +topicIds.put(topic2, Uuid.randomUuid()); +int oldLeaderEpoch = 100; +MetadataResponse metadataResponse = +RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot(); +Cluster cluster = metadata.fetch(); +// Validate metadata snapshot & cluster are setup as expected. +assertEquals(cluster, snapshot.cluster()); +assertEquals(oldNodeCount, snapshot.cluster().nodes().size()); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1)); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2)); +assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0)); + +// Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster. +// Metadata will be updated with higher # of nodes, partition-counts, leader-epoch. +int numThreads = 6; +ExecutorService service = Executors.newFixedThreadPool(numThreads); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
hachikuji commented on code in PR #15385: URL: https://github.com/apache/kafka/pull/15385#discussion_r1499908727 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart)) Mockito.reset(mockListener); } +/** + * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e. + * snapshot & cluster contain the relevant updates. + */ +@Test +public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException { +Time time = new MockTime(); +metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); + +// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100. +int oldNodeCount = 10; +String topic1 = "test_topic1"; +String topic2 = "test_topic2"; +TopicPartition topic1Part0 = new TopicPartition(topic1, 0); +Map topicPartitionCounts = new HashMap<>(); +int oldPartitionCount = 1; +topicPartitionCounts.put(topic1, oldPartitionCount); +topicPartitionCounts.put(topic2, oldPartitionCount); +Map topicIds = new HashMap<>(); +topicIds.put(topic1, Uuid.randomUuid()); +topicIds.put(topic2, Uuid.randomUuid()); +int oldLeaderEpoch = 100; +MetadataResponse metadataResponse = +RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot(); +Cluster cluster = metadata.fetch(); +// Validate metadata snapshot & cluster are setup as expected. +assertEquals(cluster, snapshot.cluster()); +assertEquals(oldNodeCount, snapshot.cluster().nodes().size()); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1)); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2)); +assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0)); + +// Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster. +// Metadata will be updated with higher # of nodes, partition-counts, leader-epoch. +int numThreads = 6; +ExecutorService service = Executors.newFixedThreadPool(numThreads); Review Comment: Can we make sure the executor gets shutdown? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on PR #15385: URL: https://github.com/apache/kafka/pull/15385#issuecomment-1954448982 All the Jenkins test failures are in different tests, which are already known to have flaky history. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr opened a new pull request, #15385: URL: https://github.com/apache/kafka/pull/15385 This is a follow-up to https://github.com/apache/kafka/pull/15323. Metadata is typically updated concurrently in the background thread, and the MetadataSnapshot/Cluster are fetched & used in another thread(typically application thread). Make sure the concurrent update & read works as expected. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org