Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


hachikuji merged PR #15385:
URL: https://github.com/apache/kafka/pull/15385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


msn-tldr commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
 Mockito.reset(mockListener);
 }
 
+/**
+ * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+@Test
+public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+Time time = new MockTime();
+metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+int oldNodeCount = 10;
+String topic1 = "test_topic1";
+String topic2 = "test_topic2";
+TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+Map topicPartitionCounts = new HashMap<>();
+int oldPartitionCount = 1;
+topicPartitionCounts.put(topic1, oldPartitionCount);
+topicPartitionCounts.put(topic2, oldPartitionCount);
+Map topicIds = new HashMap<>();
+topicIds.put(topic1, Uuid.randomUuid());
+topicIds.put(topic2, Uuid.randomUuid());
+int oldLeaderEpoch = 100;
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+Cluster cluster = metadata.fetch();
+// Validate metadata snapshot & cluster are setup as expected.
+assertEquals(cluster, snapshot.cluster());
+assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+// Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+// Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+int numThreads = 6;
+ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   done at the end of the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


msn-tldr commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
 Mockito.reset(mockListener);
 }
 
+/**
+ * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+@Test
+public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+Time time = new MockTime();
+metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+int oldNodeCount = 10;
+String topic1 = "test_topic1";
+String topic2 = "test_topic2";
+TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+Map topicPartitionCounts = new HashMap<>();
+int oldPartitionCount = 1;
+topicPartitionCounts.put(topic1, oldPartitionCount);
+topicPartitionCounts.put(topic2, oldPartitionCount);
+Map topicIds = new HashMap<>();
+topicIds.put(topic1, Uuid.randomUuid());
+topicIds.put(topic2, Uuid.randomUuid());
+int oldLeaderEpoch = 100;
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+Cluster cluster = metadata.fetch();
+// Validate metadata snapshot & cluster are setup as expected.
+assertEquals(cluster, snapshot.cluster());
+assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+// Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+// Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+int numThreads = 6;
+ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-23 Thread via GitHub


hachikuji commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1499908727


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
 Mockito.reset(mockListener);
 }
 
+/**
+ * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+@Test
+public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+Time time = new MockTime();
+metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+int oldNodeCount = 10;
+String topic1 = "test_topic1";
+String topic2 = "test_topic2";
+TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+Map topicPartitionCounts = new HashMap<>();
+int oldPartitionCount = 1;
+topicPartitionCounts.put(topic1, oldPartitionCount);
+topicPartitionCounts.put(topic2, oldPartitionCount);
+Map topicIds = new HashMap<>();
+topicIds.put(topic1, Uuid.randomUuid());
+topicIds.put(topic2, Uuid.randomUuid());
+int oldLeaderEpoch = 100;
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+Cluster cluster = metadata.fetch();
+// Validate metadata snapshot & cluster are setup as expected.
+assertEquals(cluster, snapshot.cluster());
+assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+// Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+// Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+int numThreads = 6;
+ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   Can we make sure the executor gets shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-20 Thread via GitHub


msn-tldr commented on PR #15385:
URL: https://github.com/apache/kafka/pull/15385#issuecomment-1954448982

   All the Jenkins test failures are in different tests, which are already 
known to have flaky history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-16 Thread via GitHub


msn-tldr opened a new pull request, #15385:
URL: https://github.com/apache/kafka/pull/15385

   This is a follow-up to https://github.com/apache/kafka/pull/15323. 
   
   Metadata is typically updated concurrently in the background thread, and the 
MetadataSnapshot/Cluster are fetched & used in another thread(typically 
application thread). Make sure the concurrent update & read works as expected.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org