chia7712 commented on code in PR #21230:
URL: https://github.com/apache/kafka/pull/21230#discussion_r2673052187


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1192,6 +1213,487 @@ public void 
testCreateClusterAndCreateTopicWithRemoteLogManagerInstantiation() t
         }
     }
 
+    @Test
+    public void testCreateClusterAndRestartControllerNode() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            var controller = cluster.controllers().values().stream()
+                .filter(c -> c.controller().isActive())
+                .findFirst()
+                .get();
+            var port = controller.socketServer().boundPort(
+                
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+            // shutdown active controller
+            controller.shutdown();
+            // Rewrite The `listeners` config to avoid controller socket 
server init using different port
+            var config = controller.sharedServer().controllerConfig().props();
+            ((java.util.HashMap<String, String>) 
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+                "CONTROLLER://localhost:" + port);
+            
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+            // restart controller
+            controller.startup();
+            TestUtils.waitForCondition(() -> 
cluster.controllers().values().stream()
+                .anyMatch(c -> c.controller().isActive()),
+                "Timeout waiting for new controller election");
+        }
+    }
+
+    @Test
+    public void testSnapshotCount() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(0)
+                .setNumControllerNodes(1)
+                .build())
+            .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+            .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this 
low to generate metadata
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            var metaLog = FileSystems.getDefault().getPath(
+                cluster.controllers().get(3000).config().metadataLogDir(),
+                "__cluster_metadata-0");
+            TestUtils.waitForCondition(() -> {
+                var files = metaLog.toFile().listFiles((dir, name) ->
+                    name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+                );
+                return files != null && files.length > 0;
+            }, "Failed to see at least one snapshot");
+            Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+            var filesAfterTenIntervals = metaLog.toFile().listFiles((dir, 
name) ->
+                name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+            );
+            int countAfterTenIntervals = filesAfterTenIntervals != null ? 
filesAfterTenIntervals.length : 0;
+            assertTrue(countAfterTenIntervals > 1,
+                "Expected to see at least one more snapshot, saw " + 
countAfterTenIntervals);
+            assertTrue(countAfterTenIntervals < 20,
+                "Did not expect to see more than twice as many snapshots as 
snapshot intervals, saw " + countAfterTenIntervals);
+            TestUtils.waitForCondition(() -> {
+                var emitterMetrics = 
cluster.controllers().values().iterator().next()
+                    .sharedServer().snapshotEmitter().metrics();
+                return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+            }, "Failed to see latestSnapshotGeneratedBytes > 0");
+        }
+    }
+
+    /**
+     * Test a single broker, single controller cluster at the minimum 
bootstrap level. This tests
+     * that we can function without having periodic NoOpRecords written.
+     */
+    @Test
+    public void testSingleControllerSingleBrokerCluster() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        }
+    }
+
+    @Test
+    public void testOverlyLargeCreateTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin()) {
+                var newTopics = new ArrayList<NewTopic>();
+                for (int i = 0; i <= 10000; i++) {
+                    newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+                }
+                var executionException = assertThrows(ExecutionException.class,
+                    () -> admin.createTopics(newTopics).all().get());
+                assertNotNull(executionException.getCause());
+                assertEquals(PolicyViolationException.class, 
executionException.getCause().getClass());
+                assertEquals("Excessively large number of partitions per 
request.",
+                    executionException.getCause().getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testTimedOutHeartbeats() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(1)
+                .build())
+            .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10")
+            .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, 
"1000")
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            var controller = cluster.controllers().values().iterator().next();
+            controller.controller().waitForReadyBrokers(3).get();
+            TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                var latch = 
QuorumControllerIntegrationTestUtils.pause((QuorumController) 
controller.controller());

Review Comment:
   Removing the usage of `QuorumControllerIntegrationTestUtils` decouples the 
dependency on the metadata module's test scope



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to