Repository: kafka Updated Branches: refs/heads/trunk 6bee1e9e5 -> 13deb84dc
MINOR: Null out all buffer references in RequestChannel.Request The previous code kept two references to `Buffer` and only nulled out one of them. As part of this, I removed the `case` modifier from `RequestChannel.{Request, Response}`. They don't seem to be good matches given the types of fields they contain (mutable buffers and opaque `Send` instances). Also removed a couple of unused files in `kafka.network`. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Radai Rosenblatt <radai.rosenbl...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #3596 from ijuma/release-buffer-in-request Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13deb84d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13deb84d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13deb84d Branch: refs/heads/trunk Commit: 13deb84dcf4b3e583997dbd05a0e25840f024375 Parents: 6bee1e9 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Aug 3 14:56:20 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Aug 3 14:56:20 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/network/ConnectionConfig.scala | 27 --------------- core/src/main/scala/kafka/network/Handler.scala | 35 -------------------- .../scala/kafka/network/RequestChannel.scala | 31 +++++++++++------ .../main/scala/kafka/network/SocketServer.scala | 7 ++-- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 3 +- 6 files changed, 26 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/ConnectionConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/ConnectionConfig.scala b/core/src/main/scala/kafka/network/ConnectionConfig.scala deleted file mode 100644 index cde7c09..0000000 --- a/core/src/main/scala/kafka/network/ConnectionConfig.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -trait ConnectionConfig { - val host: String - val port: Int - val sendBufferSize: Int = -1 - val receiveBufferSize: Int = -1 - val tcpNoDelay = true - val keepAlive = false -} http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/Handler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala deleted file mode 100644 index 1a7d56e..0000000 --- a/core/src/main/scala/kafka/network/Handler.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import org.apache.kafka.common.network.{NetworkReceive, Send} - -private[kafka] object Handler { - - /** - * A request handler is a function that turns an incoming - * transmission into an outgoing transmission - */ - type Handler = NetworkReceive => Option[Send] - - /** - * A handler mapping finds the right Handler function for a given request - */ - type HandlerMapping = (Short, NetworkReceive) => Handler - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7553c10..4ebef7c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -41,9 +41,9 @@ import org.apache.log4j.Logger import scala.reflect.ClassTag object RequestChannel extends Logging { - val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), - buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName = new ListenerName(""), - securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = new Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), + startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol = SecurityProtocol.PLAINTEXT, + MemoryPool.NONE, shutdownReceive) private val requestLogger = Logger.getLogger("kafka.request.logger") private def shutdownReceive: ByteBuffer = { @@ -57,12 +57,11 @@ object RequestChannel extends Logging { val sanitizedUser = QuotaId.sanitize(principal.getName) } - case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer, - private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName: ListenerName, - securityProtocol: SecurityProtocol) { + class Request(val processor: Int, val connectionId: String, val session: Session, startTimeNanos: Long, + val listenerName: ListenerName, val securityProtocol: SecurityProtocol, memoryPool: MemoryPool, + @volatile private var buffer: ByteBuffer) { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads - @volatile var bufferReference = buffer @volatile var requestDequeueTimeNanos = -1L @volatile var apiLocalCompleteTimeNanos = -1L @volatile var responseCompleteTimeNanos = -1L @@ -204,11 +203,19 @@ object RequestChannel extends Logging { } def dispose(): Unit = { - if (bufferReference != null) { - memoryPool.release(bufferReference) - bufferReference = null + if (buffer != null) { + memoryPool.release(buffer) + buffer = null } } + + override def toString = s"Request(processor=$processor, " + + s"connectionId=$connectionId, " + + s"session=$session, " + + s"listenerName=$listenerName, " + + s"securityProtocol=$securityProtocol, " + + s"buffer=$buffer)" + } object Response { @@ -227,11 +234,13 @@ object RequestChannel extends Logging { } - case class Response(request: Request, responseSend: Option[Send], responseAction: ResponseAction) { + class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction) { request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds def processor: Int = request.processor + + override def toString = s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction)" } trait ResponseAction http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e541015..f418fdd 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -533,10 +533,9 @@ private[kafka] class Processor(val id: Int, val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) - val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, - buffer = receive.payload, startTimeNanos = time.nanoseconds, - listenerName = listenerName, securityProtocol = securityProtocol, - memoryPool = memoryPool) + val req = new RequestChannel.Request(processor = id, connectionId = receive.source, session = session, + startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol, + memoryPool, receive.payload) requestChannel.sendRequest(req) selector.mute(receive.source) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index ed35269..4d969cf 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -180,7 +180,7 @@ class SocketServerTest extends JUnitSuite { for (_ <- 0 until 10) { val request = server.requestChannel.receiveRequest(2000) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(RequestChannel.Response(request, None, RequestChannel.NoOpAction)) + server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/13deb84d/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 38d4bb3..6cc3ede 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -396,7 +396,8 @@ class KafkaApisTest { val header = new RequestHeader(builder.apiKey.id, request.version, "", 0) val buffer = request.serialize(header) val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost) - (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT)) + (request, new RequestChannel.Request(1, "1", session, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT, + MemoryPool.NONE, buffer)) } private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {