rajinisivaram commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r928627267


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:

Review Comment:
   We don't usually add references to tickets in comments. Can we rewrite this 
comment to say what it is testing and move the explanation of the bug into the 
PR description that will be included in the commit message?



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener

Review Comment:
   Unnecessary comment



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms

Review Comment:
   Call this selectTimeoutMs and remove comment?



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the 
buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time

Review Comment:
   `processTimeStartNanos` and `processTimeEndNanos` since we are converting 
later



##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only 
happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @badaiaqrandista Couldn't we just set madeReadProgressLastPoll=true in 
`unmute(KafkaChannel channel)` when we add the channel to keysWithBufferedRead? 
That is the case we are trying to handle here right? It would be more obvious 
why, without having to rely on comments.
   
   ```
   if (channel.hasBytesBuffered()) {
        keysWithBufferedRead.add(channel.selectionKey());
        madeReadProgressLastPoll = true;
   }
   ```



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the 
buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+      receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 
1000)
+      val processTimeEnd = System.nanoTime()
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 
1000000.0  // in ms

Review Comment:
   Use TimeUnit.NANOSECONDS.toMillis, comment unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to