[
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855997#comment-17855997
]
Ksolves edited comment on KAFKA-16943 at 6/18/24 4:53 PM:
----------------------------------------------------------
In our current test cases
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that
the Connect worker fails to start. However, our mechanism for verifying the
startup failure lacks synchronous waiting and precise assertion.
Example Test Case (Added screenshot to show the diff of newly added changes):
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws
InterruptedException {
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
"3");
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
"2");
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
"1");
int numWorkers = 0;
int numBrokers = 1;
connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
.workerProps(workerProps)
.numWorkers(numWorkers)
.numBrokers(numBrokers)
.brokerProps(brokerProps)
.build();
// Start the brokers and Connect, but Connect should fail to create config
and offset topic
connect.start();
log.info("Completed startup of {} Kafka broker. Expected Connect worker to
fail", numBrokers);
// Try to start a worker
connect.addWorker();
// Synchronously await and verify that the worker fails during startup
boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30
seconds timeout
assertFalse(workerStarted, "Worker should not have started successfully");
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
// Verify that no workers are running
assertFalse(connect.anyWorkersRunning());
}
private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect,
long timeoutMillis) throws InterruptedException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (!connect.anyWorkersRunning()) {
return false;
}
Thread.sleep(500); // wait for 500 milliseconds before checking again
}
return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism?
I'll create the PR accordingly.
was (Author: JIRAUSER305714):
In our current test cases
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that
the Connect worker fails to start. However, our mechanism for verifying the
startup failure lacks synchronous waiting and precise assertion.
Example Test Case: [Changes in existing test case marked in
{color:#00875a}*green*{color}]
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws
InterruptedException {
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
"3");
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
"2");
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
"1");
int numWorkers = 0;
int numBrokers = 1;
connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
.workerProps(workerProps)
.numWorkers(numWorkers)
.numBrokers(numBrokers)
.brokerProps(brokerProps)
.build();
// Start the brokers and Connect, but Connect should fail to create config
and offset topic
connect.start();
log.info("Completed startup of {} Kafka broker. Expected Connect worker to
fail", numBrokers);
// Try to start a worker
connect.addWorker();
// Synchronously await and verify that the worker fails during startup
boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30
seconds timeout
assertFalse(workerStarted, "Worker should not have started successfully");
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
// Verify that no workers are running
assertFalse(connect.anyWorkersRunning());
}
private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect,
long timeoutMillis) throws InterruptedException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (!connect.anyWorkersRunning()) {
return false;
}
Thread.sleep(500); // wait for 500 milliseconds before checking again
}
return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism?
I'll create the PR accordingly.
> Synchronously verify Connect worker startup failure in
> InternalTopicsIntegrationTest
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-16943
> URL: https://issues.apache.org/jira/browse/KAFKA-16943
> Project: Kafka
> Issue Type: Improvement
> Components: connect
> Reporter: Chris Egerton
> Priority: Minor
> Labels: newbie
>
> Created after PR discussion
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker
> cannot start under poor conditions (such as when its internal topics do not
> yet exist and it is configured to create them with a higher replication
> factor than the number of available brokers, or when its internal topics
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing
> coverage, especially for the test cases
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we
> could have some way of synchronously awaiting the completion or failure of
> worker startup in our integration tests in order to guarantee that worker
> startup fails under sufficiently adverse conditions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)