This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for 
asserting that a connector is deleted (#14371)
7872a1ff5b2 is described below

commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Tue Sep 19 16:39:39 2023 +0100

    KAFKA-14855: Harden integration testing logic for asserting that a 
connector is deleted (#14371)
    
    Reviewers: Sagar Rao <sagarmeansoc...@gmail.com>, Chris Egerton 
<chr...@aiven.io>
---
 .../integration/ConnectWorkerIntegrationTest.java  |  8 +++----
 .../ConnectorTopicsIntegrationTest.java            |  8 +++----
 .../integration/ErrorHandlingIntegrationTest.java  |  8 +++----
 .../RebalanceSourceConnectorsIntegrationTest.java  |  4 ++--
 .../clusters/EmbeddedConnectClusterAssertions.java | 26 +++++++++-------------
 5 files changed, 24 insertions(+), 30 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 2e843cd6ec6..4c393d95ad3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest {
 
         // Delete the connector
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(
+        connect.assertions().assertConnectorDoesNotExist(
                 CONNECTOR_NAME,
-                "Connector tasks were not destroyed in time"
+                "Connector wasn't deleted in time"
         );
     }
 
@@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest {
 
         // Can delete a stopped connector
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(
+        connect.assertions().assertConnectorDoesNotExist(
                 CONNECTOR_NAME,
-                "Connector and all of its tasks should no longer be running"
+                "Connector wasn't deleted in time"
         );
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index a8b812f8c31..0614ba8a9f7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(BAR_CONNECTOR);
 
-        
connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR,
+                "Connector wasn't deleted in time.");
 
         connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, 
Collections.emptyList(),
                 "Active topic set is not empty for deleted connector: " + 
BAR_CONNECTOR);
@@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(FOO_CONNECTOR);
 
-        
connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR,
+                "Connector wasn't deleted in time.");
 
         connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, 
Collections.emptyList(),
                 "Active topic set is not empty for deleted connector: " + 
FOO_CONNECTOR);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 7d3c1d6924b..55479e6d4ff 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest {
         }
 
         connect.deleteConnector(CONNECTOR_NAME);
-        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+                "Connector wasn't deleted in time.");
 
     }
 
@@ -248,8 +248,8 @@ public class ErrorHandlingIntegrationTest {
         ConsumerRecords<byte[], byte[]> messages = 
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);
 
         connect.deleteConnector(CONNECTOR_NAME);
-        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
-            "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+            "Connector wasn't deleted in time.");
     }
 
     /**
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 04e12ea41e0..82004c8dc3a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,8 +206,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME + 3);
 
-        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME + 3,
+                "Connector wasn't deleted in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c4ff5018ed1..d8e488f4727 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -455,48 +455,42 @@ public class EmbeddedConnectClusterAssertions {
     }
 
     /**
-     * Assert that a connector and its tasks are not running.
+     * Assert that a connector does not exist. This can be used to verify that 
a connector has been successfully deleted.
      *
      * @param connectorName the connector name
      * @param detailMessage the assertion message
      * @throws InterruptedException
      */
-    public void assertConnectorAndTasksAreNotRunning(String connectorName, 
String detailMessage)
+    public void assertConnectorDoesNotExist(String connectorName, String 
detailMessage)
             throws InterruptedException {
         try {
             waitForCondition(
-                () -> checkConnectorAndTasksAreNotRunning(connectorName),
+                () -> checkConnectorDoesNotExist(connectorName),
                 CONNECTOR_SETUP_DURATION_MS,
-                "At least the connector or one of its tasks is still running");
+                "The connector should not exist.");
         } catch (AssertionError e) {
             throw new AssertionError(detailMessage, e);
         }
     }
 
     /**
-     * Check whether the connector or any of its tasks are still in RUNNING 
state
+     * Check whether a connector exists by querying the <strong><em>GET 
/connectors/{connector}/status</em></strong> endpoint
      *
-     * @param connectorName the connector
-     * @return true if the connector and all the tasks are not in RUNNING 
state; false otherwise
+     * @param connectorName the connector name
+     * @return true if the connector does not exist; false otherwise
      */
-    protected boolean checkConnectorAndTasksAreNotRunning(String 
connectorName) {
-        ConnectorStateInfo info;
+    protected boolean checkConnectorDoesNotExist(String connectorName) {
         try {
-            info = connect.connectorStatus(connectorName);
+            connect.connectorStatus(connectorName);
         } catch (ConnectRestException e) {
             return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
         } catch (Exception e) {
             log.error("Could not check connector state info.", e);
             return false;
         }
-        if (info == null) {
-            return true;
-        }
-        return 
!info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+        return false;
     }
 
-
     /**
      * Assert that a connector is in the stopped state and has no tasks.
      *

Reply via email to