Repository: kafka Updated Branches: refs/heads/0.9.0 6b8515817 -> 9059bbfff
KAFKA-3122; Fix memory leak in `Sender.completeBatch` on TOPIC_AUTHORIZATION_FAILED Also fix missing call to `sensors.record` on this error. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes #791 from ijuma/fix-producer-memory-leak-on-authorization-exception (cherry picked from commit e4ef8e6640a266da1d9135282ae41d18f44f8025) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9059bbff Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9059bbff Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9059bbff Branch: refs/heads/0.9.0 Commit: 9059bbfff917b4bebbb06f8c8a01cce3d8230830 Parents: 6b85158 Author: Ismael Juma <[email protected]> Authored: Tue Jan 19 22:50:24 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Jan 19 22:50:33 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/clients/producer/internals/Sender.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9059bbff/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index cada626..1201e94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -289,11 +289,14 @@ public class Sender implements Runnable { error); this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - batch.done(baseOffset, new TopicAuthorizationException(batch.topicPartition.topic())); } else { + RuntimeException exception; + if (error == Errors.TOPIC_AUTHORIZATION_FAILED) + exception = new TopicAuthorizationException(batch.topicPartition.topic()); + else + exception = error.exception(); // tell the user the result of their request - batch.done(baseOffset, error.exception()); + batch.done(baseOffset, exception); this.accumulator.deallocate(batch); if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
