Repository: kafka Updated Branches: refs/heads/trunk 58f9d7cf8 -> 717eea835
KAFKA-3655; awaitFlushCompletion() in RecordAccumulator should always decrement flushesInProgress count Author: Chen Zhu <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1315 from zhuchen1018/KAFKA-3655 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/717eea83 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/717eea83 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/717eea83 Branch: refs/heads/trunk Commit: 717eea8350feb670e8ba3dd3505c708a8a52de71 Parents: 58f9d7c Author: Chen Zhu <[email protected]> Authored: Fri May 6 21:57:53 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri May 6 21:57:53 2016 +0100 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 13 +++++--- .../internals/RecordAccumulatorTest.java | 31 ++++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 1766609..5339096 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -430,8 +430,10 @@ public final class RecordAccumulator { /** * Are there any threads currently waiting on a flush? + * + * package private for test */ - private boolean flushInProgress() { + boolean flushInProgress() { return flushesInProgress.get() > 0; } @@ -453,9 +455,12 @@ public final class RecordAccumulator { * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { - for (RecordBatch batch: this.incomplete.all()) - batch.produceFuture.await(); - this.flushesInProgress.decrementAndGet(); + try { + for (RecordBatch batch : this.incomplete.all()) + batch.produceFuture.await(); + } finally { + this.flushesInProgress.decrementAndGet(); + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index a39d2e8..b3a5a04 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -16,6 +16,7 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -39,6 +40,7 @@ import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.junit.After; import org.junit.Test; @@ -57,6 +59,7 @@ public class RecordAccumulatorTest { private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); private MockTime time = new MockTime(); + private SystemTime systemTime = new SystemTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); @@ -272,6 +275,34 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + + private void delayedInterrupt(final Thread thread, final long delayMs) { + Thread t = new Thread() { + public void run() { + systemTime.sleep(delayMs); + thread.interrupt(); + } + }; + t.start(); + } + + @Test + public void testAwaitFlushComplete() throws Exception { + RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time); + accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs); + + accum.beginFlush(); + assertTrue(accum.flushInProgress()); + delayedInterrupt(Thread.currentThread(), 1000L); + try { + accum.awaitFlushCompletion(); + fail("awaitFlushCompletion should throw InterruptException"); + } catch (InterruptedException e) { + assertFalse("flushInProgress count should be decremented even if thread is interrupted", accum.flushInProgress()); + } + } + + @Test public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE;
