This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b529d1  MINOR: Control plane listener tests should not use static 
port (#7033)
2b529d1 is described below

commit 2b529d15022c022a1f6705555442f089394d6ba3
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Jul 4 08:22:45 2019 -0700

    MINOR: Control plane listener tests should not use static port (#7033)
    
    We recently saw a few failing tests recently due to the static reliance on 
port 5000. For example:
    ```
    org.apache.kafka.common.KafkaException: Socket server failed to bind to 
localhost:5000: Address already in use.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:605)
        at kafka.network.Acceptor.<init>(SocketServer.scala:481)
        at kafka.network.SocketServer.createAcceptor(SocketServer.scala:253)
        at 
kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1(SocketServer.scala:234)
        at 
kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1$adapted(SocketServer.scala:232)
        at scala.Option.foreach(Option.scala:438)
        at 
kafka.network.SocketServer.createControlPlaneAcceptorAndProcessor(SocketServer.scala:232)
        at kafka.network.SocketServer.startup(SocketServer.scala:119)
        at 
kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1139)
        at 
kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:198)
    ```
    This patch fixes the failing tests to dynamically select the port.
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../kafka/controller/ControllerIntegrationTest.scala | 20 +++++++++++---------
 .../scala/unit/kafka/network/SocketServerTest.scala  | 10 +++++++---
 2 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index a69a4a2..473fb03 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -89,7 +89,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
   @Test
   def testMetadataPropagationOnControlPlane(): Unit = {
-    servers = makeServers(1, listeners = 
Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), 
listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+    servers = makeServers(1,
+      listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:0"),
+      listenerSecurityProtocolMap = 
Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
       controlPlaneListenerName = Some("CONTROLLER"))
     TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
     val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
@@ -102,14 +104,14 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
         dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
       }
     }
-    assertEquals(1e-0, 
controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
 0)
-    assertEquals(0e-0, 
dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
 0)
-    assertEquals(1e-0, 
controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double],
 0)
-    assertEquals(0e-0, 
dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 
0)
-    
assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
 > 1.0)
-    
assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
 == 0.0)
-    
assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
 == 2.0)
-    
assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
 == 0.0)
+    assertEquals(1e-0, 
controlPlaneMetricMap("response-total").metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, 
dataPlaneMetricMap("response-total").metricValue().asInstanceOf[Double], 0)
+    assertEquals(1e-0, 
controlPlaneMetricMap("request-total").metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, 
dataPlaneMetricMap("request-total").metricValue().asInstanceOf[Double], 0)
+    
assertTrue(controlPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
 > 1.0)
+    
assertTrue(dataPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
 == 0.0)
+    
assertTrue(controlPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
 == 2.0)
+    
assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
 == 0.0)
   }
 
   // This test case is used to ensure that there will be no correctness issue 
after we avoid sending out full
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7bed0d5..82b8bf9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -135,7 +135,10 @@ class SocketServerTest {
     channel.sendResponse(new RequestChannel.SendResponse(request, send, 
Some(request.header.toString), None))
   }
 
-  def connect(s: SocketServer = server, listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: 
InetAddress = null, port: Int = 0) = {
+  def connect(s: SocketServer = server,
+              listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+              localAddr: InetAddress = null,
+              port: Int = 0) = {
     val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, 
port)
     sockets += socket
     socket
@@ -191,12 +194,13 @@ class SocketServerTest {
   def testControlPlaneRequest(): Unit = {
     val testProps = new Properties
     testProps ++= props
-    testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
     testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
     testProps.put("control.plane.listener.name", "CONTROLLER")
     val config = KafkaConfig.fromProps(testProps)
     withTestableServer(config, { testableServer =>
-      val socket = connect(testableServer, 
config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port 
= 5000)
+      val socket = connect(testableServer, config.controlPlaneListenerName.get,
+        localAddr = InetAddress.getLocalHost)
       sendAndReceiveControllerRequest(socket, testableServer)
     })
   }

Reply via email to