dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754109304



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
s"localhost:$overrideNum")

Review comment:
       I guess that we don't need this in the context of this test, do we?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,9 +730,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, 
endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encountered IOException, closing connection", e)

Review comment:
       Should we log this as an error? Moreover, I would rephrase a bit the 
log: `Encountered an error while configuring the connection, closing it.`

##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
s"localhost:$overrideNum")
+    val serverMetrics = new Metrics()
+
+    val overrideServer = new 
SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager) {
+
+      // same as SocketServer.createAcceptor,
+      // except the Acceptor overriding a method to inject the exception
+      override protected def createAcceptor(endPoint: EndPoint, metricPrefix: 
String): Acceptor = {
+        val sendBufferSize = config.socketSendBufferBytes
+        val recvBufferSize = config.socketReceiveBufferBytes
+        new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, 
connectionQuotas, metricPrefix, time) {
+          override protected def configureAcceptedSocketChannel(socketChannel: 
SocketChannel): Unit = {
+            throw new IOException("test injected IOException")
+          }
+        }
+      }
+    }
+
+    try {
+      overrideServer.startup()
+      val conn = connect(overrideServer)
+      conn.setSoTimeout(3000)
+      assertEquals(-1, conn.getInputStream.read())

Review comment:
       Could we also verify that the connection quota is correct? We can use 
`connectionQuotas.get` for this with the ip of the relevant listener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to