This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b529d1 MINOR: Control plane listener tests should not use static
port (#7033)
2b529d1 is described below
commit 2b529d15022c022a1f6705555442f089394d6ba3
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Jul 4 08:22:45 2019 -0700
MINOR: Control plane listener tests should not use static port (#7033)
We recently saw a few failing tests recently due to the static reliance on
port 5000. For example:
```
org.apache.kafka.common.KafkaException: Socket server failed to bind to
localhost:5000: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:605)
at kafka.network.Acceptor.<init>(SocketServer.scala:481)
at kafka.network.SocketServer.createAcceptor(SocketServer.scala:253)
at
kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1(SocketServer.scala:234)
at
kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1$adapted(SocketServer.scala:232)
at scala.Option.foreach(Option.scala:438)
at
kafka.network.SocketServer.createControlPlaneAcceptorAndProcessor(SocketServer.scala:232)
at kafka.network.SocketServer.startup(SocketServer.scala:119)
at
kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1139)
at
kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:198)
```
This patch fixes the failing tests to dynamically select the port.
Reviewers: Ismael Juma <[email protected]>
---
.../kafka/controller/ControllerIntegrationTest.scala | 20 +++++++++++---------
.../scala/unit/kafka/network/SocketServerTest.scala | 10 +++++++---
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index a69a4a2..473fb03 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -89,7 +89,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testMetadataPropagationOnControlPlane(): Unit = {
- servers = makeServers(1, listeners =
Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"),
listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+ servers = makeServers(1,
+ listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:0"),
+ listenerSecurityProtocolMap =
Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
controlPlaneListenerName = Some("CONTROLLER"))
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
@@ -102,14 +104,14 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
}
}
- assertEquals(1e-0,
controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
0)
- assertEquals(0e-0,
dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
0)
- assertEquals(1e-0,
controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double],
0)
- assertEquals(0e-0,
dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double],
0)
-
assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
> 1.0)
-
assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
== 0.0)
-
assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
== 2.0)
-
assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
== 0.0)
+ assertEquals(1e-0,
controlPlaneMetricMap("response-total").metricValue().asInstanceOf[Double], 0)
+ assertEquals(0e-0,
dataPlaneMetricMap("response-total").metricValue().asInstanceOf[Double], 0)
+ assertEquals(1e-0,
controlPlaneMetricMap("request-total").metricValue().asInstanceOf[Double], 0)
+ assertEquals(0e-0,
dataPlaneMetricMap("request-total").metricValue().asInstanceOf[Double], 0)
+
assertTrue(controlPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
> 1.0)
+
assertTrue(dataPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
== 0.0)
+
assertTrue(controlPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
== 2.0)
+
assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
== 0.0)
}
// This test case is used to ensure that there will be no correctness issue
after we avoid sending out full
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7bed0d5..82b8bf9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -135,7 +135,10 @@ class SocketServerTest {
channel.sendResponse(new RequestChannel.SendResponse(request, send,
Some(request.header.toString), None))
}
- def connect(s: SocketServer = server, listenerName: ListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr:
InetAddress = null, port: Int = 0) = {
+ def connect(s: SocketServer = server,
+ listenerName: ListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ localAddr: InetAddress = null,
+ port: Int = 0) = {
val socket = new Socket("localhost", s.boundPort(listenerName), localAddr,
port)
sockets += socket
socket
@@ -191,12 +194,13 @@ class SocketServerTest {
def testControlPlaneRequest(): Unit = {
val testProps = new Properties
testProps ++= props
- testProps.put("listeners",
"PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+ testProps.put("listeners",
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
testProps.put("listener.security.protocol.map",
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
- val socket = connect(testableServer,
config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port
= 5000)
+ val socket = connect(testableServer, config.controlPlaneListenerName.get,
+ localAddr = InetAddress.getLocalHost)
sendAndReceiveControllerRequest(socket, testableServer)
})
}