[ 
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855997#comment-17855997
 ] 

Ksolves edited comment on KAFKA-16943 at 6/18/24 4:53 PM:
----------------------------------------------------------

In our current test cases 
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and 
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that 
the Connect worker fails to start. However, our mechanism for verifying the 
startup failure lacks synchronous waiting and precise assertion.

Example Test Case (Added screenshot to show the diff of newly added changes):
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws 
InterruptedException {
    workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"3");
    workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"2");
    workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
    int numWorkers = 0;
    int numBrokers = 1;
    connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
                                                  .workerProps(workerProps)
                                                  .numWorkers(numWorkers)
                                                  .numBrokers(numBrokers)
                                                  .brokerProps(brokerProps)
                                                  .build();

    // Start the brokers and Connect, but Connect should fail to create config 
and offset topic
    connect.start();
    log.info("Completed startup of {} Kafka broker. Expected Connect worker to 
fail", numBrokers);

    // Try to start a worker
    connect.addWorker();

    // Synchronously await and verify that the worker fails during startup
    boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30 
seconds timeout
    assertFalse(workerStarted, "Worker should not have started successfully");

    log.info("Verifying the internal topics for Connect");
    connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());

    // Verify that no workers are running
    assertFalse(connect.anyWorkersRunning());
}

private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect, 
long timeoutMillis) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime < timeoutMillis) {
        if (!connect.anyWorkersRunning()) {
            return false;
        }
        Thread.sleep(500); // wait for 500 milliseconds before checking again
    }
    return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism? 
I'll create the PR accordingly.


was (Author: JIRAUSER305714):
In our current test cases 
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and 
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that 
the Connect worker fails to start. However, our mechanism for verifying the 
startup failure lacks synchronous waiting and precise assertion.

Example Test Case: [Changes in existing test case marked in 
{color:#00875a}*green*{color}]
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws 
InterruptedException {
    workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"3");
    workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"2");
    workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
    int numWorkers = 0;
    int numBrokers = 1;
    connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
                                                  .workerProps(workerProps)
                                                  .numWorkers(numWorkers)
                                                  .numBrokers(numBrokers)
                                                  .brokerProps(brokerProps)
                                                  .build();

    // Start the brokers and Connect, but Connect should fail to create config 
and offset topic
    connect.start();
    log.info("Completed startup of {} Kafka broker. Expected Connect worker to 
fail", numBrokers);

    // Try to start a worker
    connect.addWorker();

    // Synchronously await and verify that the worker fails during startup
    boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30 
seconds timeout
    assertFalse(workerStarted, "Worker should not have started successfully");

    log.info("Verifying the internal topics for Connect");
    connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());

    // Verify that no workers are running
    assertFalse(connect.anyWorkersRunning());
}

private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect, 
long timeoutMillis) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime < timeoutMillis) {
        if (!connect.anyWorkersRunning()) {
            return false;
        }
        Thread.sleep(500); // wait for 500 milliseconds before checking again
    }
    return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism? 
I'll create the PR accordingly.

> Synchronously verify Connect worker startup failure in 
> InternalTopicsIntegrationTest
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16943
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16943
>             Project: Kafka
>          Issue Type: Improvement
>          Components: connect
>            Reporter: Chris Egerton
>            Priority: Minor
>              Labels: newbie
>
> Created after PR discussion 
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker 
> cannot start under poor conditions (such as when its internal topics do not 
> yet exist and it is configured to create them with a higher replication 
> factor than the number of available brokers, or when its internal topics 
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing 
> coverage, especially for the test cases 
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
> could have some way of synchronously awaiting the completion or failure of 
> worker startup in our integration tests in order to guarantee that worker 
> startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to