badaiaqrandista commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r927123562
########## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ########## @@ -1878,6 +1878,98 @@ class SocketServerTest { }, false) } + /** + * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition: + * 1. Client-Server communication uses SSL socket. + * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer. + * + * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that + * leads to this situation is the following (from the server point of view): + * + * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer + * (SslTransportLayer.netReadBuffer). + * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer. + * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer. + * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from + * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll() + * should not block for 300 ms. + * + * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly + * into SslTransportLayer.netReadBuffer and manually trigger the processing. + * + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { + shutdownServerAndMetrics(server) + + // create server with SSL listener + val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps)) + testableServer.enableRequestProcessing(Map.empty) + val testableSelector = testableServer.testableSelector + val proxyServer = new ProxyServer(testableServer) + val selectTimeout = 5000 // in ms + // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified + testableSelector.pollTimeoutOverride = Some(selectTimeout) + + try { + // trigger SSL handshake by sending the first request and receiving its response without buffering + val requestBytes = producerRequestBytes() + val sslSocket = sslClientSocket(proxyServer.localPort) + + sendRequest(sslSocket, requestBytes) + val request1 = receiveRequest(testableServer.dataPlaneRequestChannel) + processRequest(testableServer.dataPlaneRequestChannel, request1) + receiveResponse(sslSocket) + + // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer + val connectionId = request1.context.connectionId + val listener = testableServer.config.dataPlaneListeners.head.listenerName.value + val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found")) + val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer") + val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer") + + proxyServer.enableBuffering(netReadBuffer) + sendRequest(sslSocket, requestBytes) + sendRequest(sslSocket, requestBytes) + + val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") + keysWithBufferedRead.add(channel.selectionKey) + JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true) + + // process the first request in the server side + // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request + // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer) + testableSelector.wakeup() + val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000) + processRequest(testableServer.dataPlaneRequestChannel, req1) + + // receive response in the client side + receiveResponse(sslSocket) + + // process the second request in the server side + // this would process the second request in the appReadBuffer + // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559, + // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct) + // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place + val processTimeStart = System.nanoTime() // using nanoTime() because it is meant to calculate elapsed time + val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000) Review Comment: `receiveRequest()` actually only calls `requestQueue.poll()` underneath. If I set the timeout of the second `receiveRequest()` to 1s, it always fails because the request getting into `requestQueue` relies on `Selector.poll()` completing the read from the channel, which is blocked by the `select(timeout)` call in `Selector.java`. So the timeout here has to be `selectTimeout + 1000`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org