This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371) 7872a1ff5b2 is described below commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue Sep 19 16:39:39 2023 +0100 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371) Reviewers: Sagar Rao <sagarmeansoc...@gmail.com>, Chris Egerton <chr...@aiven.io> --- .../integration/ConnectWorkerIntegrationTest.java | 8 +++---- .../ConnectorTopicsIntegrationTest.java | 8 +++---- .../integration/ErrorHandlingIntegrationTest.java | 8 +++---- .../RebalanceSourceConnectorsIntegrationTest.java | 4 ++-- .../clusters/EmbeddedConnectClusterAssertions.java | 26 +++++++++------------- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 2e843cd6ec6..4c393d95ad3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest { // Delete the connector connect.deleteConnector(CONNECTOR_NAME); - connect.assertions().assertConnectorAndTasksAreNotRunning( + connect.assertions().assertConnectorDoesNotExist( CONNECTOR_NAME, - "Connector tasks were not destroyed in time" + "Connector wasn't deleted in time" ); } @@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest { // Can delete a stopped connector connect.deleteConnector(CONNECTOR_NAME); - connect.assertions().assertConnectorAndTasksAreNotRunning( + connect.assertions().assertConnectorDoesNotExist( CONNECTOR_NAME, - "Connector and all of its tasks should no longer be running" + "Connector wasn't deleted in time" ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index a8b812f8c31..0614ba8a9f7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest { // deleting a connector resets its active topics connect.deleteConnector(BAR_CONNECTOR); - connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR, - "Connector tasks did not stop in time."); + connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR, + "Connector wasn't deleted in time."); connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: " + BAR_CONNECTOR); @@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest { // deleting a connector resets its active topics connect.deleteConnector(FOO_CONNECTOR); - connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR, - "Connector tasks did not stop in time."); + connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR, + "Connector wasn't deleted in time."); connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: " + FOO_CONNECTOR); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 7d3c1d6924b..55479e6d4ff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest { } connect.deleteConnector(CONNECTOR_NAME); - connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME, - "Connector tasks did not stop in time."); + connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, + "Connector wasn't deleted in time."); } @@ -248,8 +248,8 @@ public class ErrorHandlingIntegrationTest { ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); connect.deleteConnector(CONNECTOR_NAME); - connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME, - "Connector tasks did not stop in time."); + connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, + "Connector wasn't deleted in time."); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index 04e12ea41e0..82004c8dc3a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -206,8 +206,8 @@ public class RebalanceSourceConnectorsIntegrationTest { // delete connector connect.deleteConnector(CONNECTOR_NAME + 3); - connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3, - "Connector tasks did not stop in time."); + connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME + 3, + "Connector wasn't deleted in time."); waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java index c4ff5018ed1..d8e488f4727 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java @@ -455,48 +455,42 @@ public class EmbeddedConnectClusterAssertions { } /** - * Assert that a connector and its tasks are not running. + * Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted. * * @param connectorName the connector name * @param detailMessage the assertion message * @throws InterruptedException */ - public void assertConnectorAndTasksAreNotRunning(String connectorName, String detailMessage) + public void assertConnectorDoesNotExist(String connectorName, String detailMessage) throws InterruptedException { try { waitForCondition( - () -> checkConnectorAndTasksAreNotRunning(connectorName), + () -> checkConnectorDoesNotExist(connectorName), CONNECTOR_SETUP_DURATION_MS, - "At least the connector or one of its tasks is still running"); + "The connector should not exist."); } catch (AssertionError e) { throw new AssertionError(detailMessage, e); } } /** - * Check whether the connector or any of its tasks are still in RUNNING state + * Check whether a connector exists by querying the <strong><em>GET /connectors/{connector}/status</em></strong> endpoint * - * @param connectorName the connector - * @return true if the connector and all the tasks are not in RUNNING state; false otherwise + * @param connectorName the connector name + * @return true if the connector does not exist; false otherwise */ - protected boolean checkConnectorAndTasksAreNotRunning(String connectorName) { - ConnectorStateInfo info; + protected boolean checkConnectorDoesNotExist(String connectorName) { try { - info = connect.connectorStatus(connectorName); + connect.connectorStatus(connectorName); } catch (ConnectRestException e) { return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode(); } catch (Exception e) { log.error("Could not check connector state info.", e); return false; } - if (info == null) { - return true; - } - return !info.connector().state().equals(AbstractStatus.State.RUNNING.toString()) - && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); + return false; } - /** * Assert that a connector is in the stopped state and has no tasks. *