This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f6c0630  KAFKA-8091; Wait for processor shutdown before testing 
removed listeners (#6425)
f6c0630 is described below

commit f6c0630d2ffea2398844e6fa2b25a924088e276d
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Mar 11 18:43:46 2019 +0000

    KAFKA-8091; Wait for processor shutdown before testing removed listeners 
(#6425)
    
    DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners removes a 
listener, waits for the config to be propagated to all brokers and then 
validates that connections to the removed listener fail. But there is a small 
timing window between config update and Processor shutdown. Before validating 
that connections to a removed listener fail, this commit waits for all metrics 
of the removed listener to be deleted, ensuring that the Processors of the 
listener have been shutdown.
    
    Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala        | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 80ed131..798961e 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -908,6 +908,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   private def verifyAddListener(listenerName: String, securityProtocol: 
SecurityProtocol,
                                 saslMechanisms: Seq[String]): Unit = {
     addListener(servers, listenerName, securityProtocol, saslMechanisms)
+    TestUtils.waitUntilTrue(() => servers.forall(hasListenerMetric(_, 
listenerName)),
+      "Processors not started for new listener")
     if (saslMechanisms.nonEmpty)
       saslMechanisms.foreach { mechanism =>
         verifyListener(securityProtocol, Some(mechanism), 
s"add-listener-group-$securityProtocol-$mechanism")
@@ -954,6 +956,10 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
     TestUtils.waitUntilTrue(() => servers.forall(server => 
server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
+    // Wait until metrics of the listener have been removed to ensure that 
processors have been shutdown before
+    // verifying that connections to the removed listener fail.
+    TestUtils.waitUntilTrue(() => !servers.exists(hasListenerMetric(_, 
listenerName)),
+      "Processors not shutdown for removed listener")
 
     // Test that connections using deleted listener don't work
     val producerFuture = verifyConnectionFailure(producer1)
@@ -992,6 +998,10 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
+  private def hasListenerMetric(server: KafkaServer, listenerName: String): 
Boolean = {
+    
server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener")
 == listenerName)
+  }
+
   private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties 
= {
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, 
server.config.brokerId.toString)
     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = 
true)

Reply via email to