Repository: kafka Updated Branches: refs/heads/trunk 78fa20eb5 -> 1fbe445dd
KAFKA-3388; Fix expiration of batches sitting in the accumulator Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #1056 from becketqin/KAFKA-3388 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fbe445d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fbe445d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fbe445d Branch: refs/heads/trunk Commit: 1fbe445dde71df0023a978c5e54dd229d3d23e1b Parents: 78fa20e Author: Jiangjie Qin <[email protected]> Authored: Sat Mar 26 09:22:59 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Sat Mar 26 09:22:59 2016 -0700 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 35 ++++++---- .../clients/producer/internals/RecordBatch.java | 19 ++++-- .../internals/RecordAccumulatorTest.java | 67 +++++++++++++++++--- 3 files changed, 94 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index beaa832..915c4d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients.producer.internals; import java.util.Iterator; + import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; @@ -217,19 +218,27 @@ public final class RecordAccumulator { int count = 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { Deque<RecordBatch> dq = entry.getValue(); - synchronized (dq) { - // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut - Iterator<RecordBatch> batchIterator = dq.iterator(); - while (batchIterator.hasNext()) { - RecordBatch batch = batchIterator.next(); - // check if the batch is expired - if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { - expiredBatches.add(batch); - count++; - batchIterator.remove(); - deallocate(batch); - } else { - if (!batch.inRetry()) { + // We only check if the batch should be expired if the partition does not have a batch in flight. + // This is to avoid the later batches get expired when an earlier batch is still in progress. + // This protection only takes effect when user sets max.in.flight.request.per.connection=1. + // Otherwise the expiration order is not guranteed. + TopicPartition tp = entry.getKey(); + if (!muted.contains(tp)) { + synchronized (dq) { + // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut + RecordBatch lastBatch = dq.peekLast(); + Iterator<RecordBatch> batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + boolean isFull = batch != lastBatch || batch.records.isFull(); + // check if the batch is expired + if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { + expiredBatches.add(batch); + count++; + batchIterator.remove(); + deallocate(batch); + } else { + // Stop at the first batch that has not expired. break; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb7bbb3..e6cd68f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -134,14 +134,23 @@ public final class RecordBatch { } /** - * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable. - * We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready - * if the leader is unavailable. + * A batch whose metadata is not available should be expired if one of the following is true: + * <ol> + * <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached). + * <li> the batch is in retry AND request timeout has elapsed after the backoff period ended. + * </ol> */ - public boolean maybeExpire(int requestTimeout, long now, long lingerMs) { + public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { boolean expire = false; - if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) { + + if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) + expire = true; + else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) expire = true; + else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) + expire = true; + + if (expire) { this.records.close(); this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing " + recordCount + " record(s) expired due to timeout while requesting metadata from brokers for " + topicPartition)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 3660272..904aa73 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -297,22 +297,71 @@ public class RecordAccumulatorTest { @Test public void testExpiredBatches() throws InterruptedException { - long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time); + long retryBackoffMs = 100L; + long lingerMs = 3000L; + int requestTimeout = 60; + + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; + + // Test batches not in retry for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); } - time.sleep(2000); - accum.ready(cluster, now); + // Make the batches ready due to batch full accum.append(tp1, 0L, key, value, null, 0); Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet()); - now = time.milliseconds(); - List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now); - assertEquals(1, expiredBatches.size()); + // Advance the clock to expire the batch. + time.sleep(requestTimeout + 1); + accum.mutePartition(tp1); + List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + // Advance the clock to make the next batch ready due to linger.ms + time.sleep(lingerMs); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + time.sleep(requestTimeout + 1); + + accum.mutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + // Test batches in retry. + // Create a retried batch + accum.append(tp1, 0L, key, value, null, 0); + time.sleep(lingerMs); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1); + time.sleep(1000L); + accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + + // test expiration. + time.sleep(requestTimeout + retryBackoffMs); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired.", 0, expiredBatches.size()); + time.sleep(1L); + + accum.mutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); } @Test
