[ https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855997#comment-17855997 ]
Ksolves edited comment on KAFKA-16943 at 6/18/24 4:53 PM: ---------------------------------------------------------- In our current test cases (`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and `testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that the Connect worker fails to start. However, our mechanism for verifying the startup failure lacks synchronous waiting and precise assertion. Example Test Case (Added screenshot to show the diff of newly added changes): {code:java} @Test public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws InterruptedException { workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "3"); workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "2"); workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1"); int numWorkers = 0; int numBrokers = 1; connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") .workerProps(workerProps) .numWorkers(numWorkers) .numBrokers(numBrokers) .brokerProps(brokerProps) .build(); // Start the brokers and Connect, but Connect should fail to create config and offset topic connect.start(); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Try to start a worker connect.addWorker(); // Synchronously await and verify that the worker fails during startup boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30 seconds timeout assertFalse(workerStarted, "Worker should not have started successfully"); log.info("Verifying the internal topics for Connect"); connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic()); // Verify that no workers are running assertFalse(connect.anyWorkersRunning()); } private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect, long timeoutMillis) throws InterruptedException { long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < timeoutMillis) { if (!connect.anyWorkersRunning()) { return false; } Thread.sleep(500); // wait for 500 milliseconds before checking again } return true; } {code} What changes do you suggest to improve this synchronous verification mechanism? I'll create the PR accordingly. was (Author: JIRAUSER305714): In our current test cases (`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and `testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that the Connect worker fails to start. However, our mechanism for verifying the startup failure lacks synchronous waiting and precise assertion. Example Test Case: [Changes in existing test case marked in {color:#00875a}*green*{color}] {code:java} @Test public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws InterruptedException { workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "3"); workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "2"); workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1"); int numWorkers = 0; int numBrokers = 1; connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") .workerProps(workerProps) .numWorkers(numWorkers) .numBrokers(numBrokers) .brokerProps(brokerProps) .build(); // Start the brokers and Connect, but Connect should fail to create config and offset topic connect.start(); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Try to start a worker connect.addWorker(); // Synchronously await and verify that the worker fails during startup boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30 seconds timeout assertFalse(workerStarted, "Worker should not have started successfully"); log.info("Verifying the internal topics for Connect"); connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic()); // Verify that no workers are running assertFalse(connect.anyWorkersRunning()); } private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect, long timeoutMillis) throws InterruptedException { long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < timeoutMillis) { if (!connect.anyWorkersRunning()) { return false; } Thread.sleep(500); // wait for 500 milliseconds before checking again } return true; } {code} What changes do you suggest to improve this synchronous verification mechanism? I'll create the PR accordingly. > Synchronously verify Connect worker startup failure in > InternalTopicsIntegrationTest > ------------------------------------------------------------------------------------ > > Key: KAFKA-16943 > URL: https://issues.apache.org/jira/browse/KAFKA-16943 > Project: Kafka > Issue Type: Improvement > Components: connect > Reporter: Chris Egerton > Priority: Minor > Labels: newbie > > Created after PR discussion > [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220]. > In some of our integration tests, we want to verify that a Connect worker > cannot start under poor conditions (such as when its internal topics do not > yet exist and it is configured to create them with a higher replication > factor than the number of available brokers, or when its internal topics > already exist but they do not have the compaction cleanup policy). > This is currently not possible, and presents a possible gap in testing > coverage, especially for the test cases > {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and > {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we > could have some way of synchronously awaiting the completion or failure of > worker startup in our integration tests in order to guarantee that worker > startup fails under sufficiently adverse conditions. -- This message was sent by Atlassian Jira (v8.20.10#820010)