splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r926147113


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.

Review Comment:
   This comment is written largely in the context of the JIRA ticket. I think 
we can write it more in the context of the current code, eg: replace references 
to `300 ms` with references to `poll timeout` so that if the poll timeout is 
changed, it is still consistent.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)

Review Comment:
   We can just call `receiveRequest` with a relatively small timeoutOverride 
(something like 1s). If we don't receive the request in 1s `receiveRequest` 
will throw an exception - which roughly captures what we're looking to test.



##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only 
happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   This is probably not the right place for this logic - we only want to set 
the flag if we completed a send and updated the mute state for a channel.
   
   @ijuma's comment was probably more around the naming being unintuitive - we 
were previously setting `madeReadProgressLastPoll` when completing a Send.
   
   We probably need a clarifying comment around why completing a `Send` is 
making read progress - there is an eligible channel to read from that 
previously wasn't available. 
   
   Alternatively, we can update the variable name to something that captures 
the scenario we're trying to cover (or add a new variable). Maybe something 
like `addedNewBufferedKeysLastPoll`. It's not very succinct but I've always 
been terrible at variable naming.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)

Review Comment:
   there's not really any need to process the request - as long as the request 
layer receives it within a reasonable amount of time, we are good.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)
+      val processTimeEnd = System.nanoTime()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 
1000000.0  // in ms
+      assertTrue(processTimeDuration < selectTimeout,
+        "Time to process the second request (" + processTimeDuration + " ms) 
should be under " + selectTimeout + " ms")
+

Review Comment:
   likewise, we can probably remove this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to