Repository: kafka Updated Branches: refs/heads/trunk 40948a33c -> f452c426b
kafka-1673; potential java.lang.IllegalStateException in BufferPool.allocate(); patched by Jun Rao; reviewed by Jay Kreps Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f452c426 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f452c426 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f452c426 Branch: refs/heads/trunk Commit: f452c426bb562ffc5cc2db38665591d157d5c80a Parents: 40948a3 Author: Jun Rao <[email protected]> Authored: Mon Oct 6 07:45:23 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Oct 6 07:45:23 2014 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/internals/BufferPool.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f452c426/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 169a656..aa91e14 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -125,14 +125,11 @@ public final class BufferPool { // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { - try { - long startWait = time.nanoseconds(); - moreMemory.await(300, TimeUnit.MILLISECONDS); - long endWait = time.nanoseconds(); - this.waitTime.record(endWait - startWait, time.milliseconds()); - } catch (InterruptedException e) { - // This should never happen. Just let it go. - } + long startWait = time.nanoseconds(); + moreMemory.await(); + long endWait = time.nanoseconds(); + this.waitTime.record(endWait - startWait, time.milliseconds()); + // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
