Repository: kafka
Updated Branches:
  refs/heads/trunk 6bee1e9e5 -> 13deb84dc


MINOR: Null out all buffer references in RequestChannel.Request

The previous code kept two references to `Buffer` and only nulled
out one of them.

As part of this, I removed the `case` modifier from
`RequestChannel.{Request, Response}`. They don't seem to be good
matches given the types of fields they contain (mutable buffers and
opaque `Send` instances).

Also removed a couple of unused files in `kafka.network`.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Radai Rosenblatt <radai.rosenbl...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>

Closes #3596 from ijuma/release-buffer-in-request


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13deb84d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13deb84d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13deb84d

Branch: refs/heads/trunk
Commit: 13deb84dcf4b3e583997dbd05a0e25840f024375
Parents: 6bee1e9
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Thu Aug 3 14:56:20 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Aug 3 14:56:20 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/network/ConnectionConfig.scala  | 27 ---------------
 core/src/main/scala/kafka/network/Handler.scala | 35 --------------------
 .../scala/kafka/network/RequestChannel.scala    | 31 +++++++++++------
 .../main/scala/kafka/network/SocketServer.scala |  7 ++--
 .../unit/kafka/network/SocketServerTest.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  3 +-
 6 files changed, 26 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/ConnectionConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/ConnectionConfig.scala 
b/core/src/main/scala/kafka/network/ConnectionConfig.scala
deleted file mode 100644
index cde7c09..0000000
--- a/core/src/main/scala/kafka/network/ConnectionConfig.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-trait ConnectionConfig {
-  val host: String
-  val port: Int
-  val sendBufferSize: Int = -1
-  val receiveBufferSize: Int = -1
-  val tcpNoDelay = true
-  val keepAlive = false  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/Handler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/Handler.scala 
b/core/src/main/scala/kafka/network/Handler.scala
deleted file mode 100644
index 1a7d56e..0000000
--- a/core/src/main/scala/kafka/network/Handler.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import org.apache.kafka.common.network.{NetworkReceive, Send}
-
-private[kafka] object Handler {
-  
-  /**
-   * A request handler is a function that turns an incoming 
-   * transmission into an outgoing transmission
-   */
-  type Handler = NetworkReceive => Option[Send]
-  
-  /**
-   * A handler mapping finds the right Handler function for a given request
-   */
-  type HandlerMapping = (Short, NetworkReceive) => Handler
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7553c10..4ebef7c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,9 +41,9 @@ import org.apache.log4j.Logger
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
-  val AllDone = Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-    buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 
0, listenerName = new ListenerName(""),
-    securityProtocol = SecurityProtocol.PLAINTEXT)
+  val AllDone = new Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
+    startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol 
= SecurityProtocol.PLAINTEXT,
+    MemoryPool.NONE, shutdownReceive)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   private def shutdownReceive: ByteBuffer = {
@@ -57,12 +57,11 @@ object RequestChannel extends Logging {
     val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  case class Request(processor: Int, connectionId: String, session: Session, 
buffer: ByteBuffer,
-                     private val memoryPool: MemoryPool, startTimeNanos: Long, 
listenerName: ListenerName, 
-                     securityProtocol: SecurityProtocol) {
+  class Request(val processor: Int, val connectionId: String, val session: 
Session, startTimeNanos: Long,
+                val listenerName: ListenerName, val securityProtocol: 
SecurityProtocol, memoryPool: MemoryPool,
+                @volatile private var buffer: ByteBuffer) {
     // These need to be volatile because the readers are in the network thread 
and the writers are in the request
     // handler threads or the purgatory threads
-    @volatile var bufferReference = buffer
     @volatile var requestDequeueTimeNanos = -1L
     @volatile var apiLocalCompleteTimeNanos = -1L
     @volatile var responseCompleteTimeNanos = -1L
@@ -204,11 +203,19 @@ object RequestChannel extends Logging {
     }
 
     def dispose(): Unit = {
-      if (bufferReference != null) {
-        memoryPool.release(bufferReference)
-        bufferReference = null
+      if (buffer != null) {
+        memoryPool.release(buffer)
+        buffer = null
       }
     }
+
+    override def toString = s"Request(processor=$processor, " +
+      s"connectionId=$connectionId, " +
+      s"session=$session, " +
+      s"listenerName=$listenerName, " +
+      s"securityProtocol=$securityProtocol, " +
+      s"buffer=$buffer)"
+
   }
 
   object Response {
@@ -227,11 +234,13 @@ object RequestChannel extends Logging {
 
   }
 
-  case class Response(request: Request, responseSend: Option[Send], 
responseAction: ResponseAction) {
+  class Response(val request: Request, val responseSend: Option[Send], val 
responseAction: ResponseAction) {
     request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
     if (request.apiLocalCompleteTimeNanos == -1L) 
request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
     def processor: Int = request.processor
+
+    override def toString = s"Response(request=$request, 
responseSend=$responseSend, responseAction=$responseAction)"
   }
 
   trait ResponseAction

http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index e541015..f418fdd 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -533,10 +533,9 @@ private[kafka] class Processor(val id: Int,
         val openOrClosingChannel = if (openChannel != null) openChannel else 
selector.closingChannel(receive.source)
         val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
 
-        val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
-          buffer = receive.payload, startTimeNanos = time.nanoseconds,
-          listenerName = listenerName, securityProtocol = securityProtocol,
-          memoryPool = memoryPool)
+        val req = new RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
+          startTimeNanos = time.nanoseconds, listenerName = listenerName, 
securityProtocol = securityProtocol,
+          memoryPool, receive.payload)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index ed35269..4d969cf 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -180,7 +180,7 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 10) {
       val request = server.requestChannel.receiveRequest(2000)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(RequestChannel.Response(request, 
None, RequestChannel.NoOpAction))
+      server.requestChannel.sendResponse(new RequestChannel.Response(request, 
None, RequestChannel.NoOpAction))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 38d4bb3..6cc3ede 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -396,7 +396,8 @@ class KafkaApisTest {
     val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
     val buffer = request.serialize(header)
     val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
-    (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 
0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
+    (request, new RequestChannel.Request(1, "1", session, 0, new 
ListenerName(""), SecurityProtocol.PLAINTEXT,
+      MemoryPool.NONE, buffer))
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, 
capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {

Reply via email to