Repository: kafka Updated Branches: refs/heads/0.9.0 51f981b5b -> c2f8a53e7
KAFKA-3773; Remove from inflightResponses on client disconnect to prevent memory leak Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1452 from hachikuji/hotfix-processor-memleak Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2f8a53e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2f8a53e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2f8a53e Branch: refs/heads/0.9.0 Commit: c2f8a53e708ad8947be6d8320c9261bfadc877ee Parents: 51f981b Author: Jason Gustafson <[email protected]> Authored: Wed Jun 1 01:00:21 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Jun 1 01:00:21 2016 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/network/SocketServer.scala | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c2f8a53e/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 69a9569..2d098b0 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -446,6 +446,8 @@ private[kafka] class Processor(val id: Int, val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost + + inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) }
