Repository: kafka
Updated Branches:
  refs/heads/trunk 78fa20eb5 -> 1fbe445dd


KAFKA-3388; Fix expiration of batches sitting in the accumulator

Author: Jiangjie Qin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes #1056 from becketqin/KAFKA-3388


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fbe445d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fbe445d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fbe445d

Branch: refs/heads/trunk
Commit: 1fbe445dde71df0023a978c5e54dd229d3d23e1b
Parents: 78fa20e
Author: Jiangjie Qin <[email protected]>
Authored: Sat Mar 26 09:22:59 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Sat Mar 26 09:22:59 2016 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 35 ++++++----
 .../clients/producer/internals/RecordBatch.java | 19 ++++--
 .../internals/RecordAccumulatorTest.java        | 67 +++++++++++++++++---
 3 files changed, 94 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index beaa832..915c4d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.util.Iterator;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -217,19 +218,27 @@ public final class RecordAccumulator {
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : 
this.batches.entrySet()) {
             Deque<RecordBatch> dq = entry.getValue();
-            synchronized (dq) {
-                // iterate over the batches and expire them if they have 
stayed in accumulator for more than requestTimeOut
-                Iterator<RecordBatch> batchIterator = dq.iterator();
-                while (batchIterator.hasNext()) {
-                    RecordBatch batch = batchIterator.next();
-                    // check if the batch is expired
-                    if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) 
{
-                        expiredBatches.add(batch);
-                        count++;
-                        batchIterator.remove();
-                        deallocate(batch);
-                    } else {
-                        if (!batch.inRetry()) {
+            // We only check if the batch should be expired if the partition 
does not have a batch in flight.
+            // This is to avoid the later batches get expired when an earlier 
batch is still in progress.
+            // This protection only takes effect when user sets 
max.in.flight.request.per.connection=1.
+            // Otherwise the expiration order is not guranteed.
+            TopicPartition tp = entry.getKey();
+            if (!muted.contains(tp)) {
+                synchronized (dq) {
+                    // iterate over the batches and expire them if they have 
stayed in accumulator for more than requestTimeOut
+                    RecordBatch lastBatch = dq.peekLast();
+                    Iterator<RecordBatch> batchIterator = dq.iterator();
+                    while (batchIterator.hasNext()) {
+                        RecordBatch batch = batchIterator.next();
+                        boolean isFull = batch != lastBatch || 
batch.records.isFull();
+                        // check if the batch is expired
+                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, 
now, this.lingerMs, isFull)) {
+                            expiredBatches.add(batch);
+                            count++;
+                            batchIterator.remove();
+                            deallocate(batch);
+                        } else {
+                            // Stop at the first batch that has not expired.
                             break;
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index eb7bbb3..e6cd68f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -134,14 +134,23 @@ public final class RecordBatch {
     }
 
     /**
-     * Expire the batch that is ready but is sitting in accumulator for more 
than requestTimeout due to metadata being unavailable.
-     * We need to explicitly check if the record is full or linger time is met 
because the accumulator's partition may not be ready
-     * if the leader is unavailable.
+     * A batch whose metadata is not available should be expired if one of the 
following is true:
+     * <ol>
+     *     <li> the batch is not in retry AND request timeout has elapsed 
after it is ready (full or linger.ms has reached).
+     *     <li> the batch is in retry AND request timeout has elapsed after 
the backoff period ended.
+     * </ol>
      */
-    public boolean maybeExpire(int requestTimeout, long now, long lingerMs) {
+    public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long 
now, long lingerMs, boolean isFull) {
         boolean expire = false;
-        if ((this.records.isFull() && requestTimeout < (now - 
this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + 
lingerMs))) {
+
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime))
+            expire = true;
+        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs 
+ lingerMs)))
             expire = true;
+        else if (this.inRetry() && requestTimeoutMs < (now - 
(this.lastAttemptMs + retryBackoffMs)))
+            expire = true;
+
+        if (expire) {
             this.records.close();
             this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch 
containing " + recordCount + " record(s) expired due to timeout while 
requesting metadata from brokers for " + topicPartition));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fbe445d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3660272..904aa73 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -297,22 +297,71 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testExpiredBatches() throws InterruptedException {
-        long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, 10, 100L, metrics, time);
+        long retryBackoffMs = 100L;
+        long lingerMs = 3000L;
+        int requestTimeout = 60;
+
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
         int appends = 1024 / msgSize;
+
+        // Test batches not in retry
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
-            assertEquals("No partitions should be ready.", 0, 
accum.ready(cluster, now).readyNodes.size());
+            assertEquals("No partitions should be ready.", 0, 
accum.ready(cluster, time.milliseconds()).readyNodes.size());
         }
-        time.sleep(2000);
-        accum.ready(cluster, now);
+        // Make the batches ready due to batch full
         accum.append(tp1, 0L, key, value, null, 0);
         Set<Node> readyNodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new 
ArrayList<PartitionInfo>(), Collections.<String>emptySet());
-        now = time.milliseconds();
-        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, 
cluster, now);
-        assertEquals(1, expiredBatches.size());
+        // Advance the clock to expire the batch.
+        time.sleep(requestTimeout + 1);
+        accum.mutePartition(tp1);
+        List<RecordBatch> expiredBatches = 
accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should be expired", 1, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
+
+        // Advance the clock to make the next batch ready due to linger.ms
+        time.sleep(lingerMs);
+        assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
+        time.sleep(requestTimeout + 1);
+
+        accum.mutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should not be expired when metadata is still 
available and partition is muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should be expired when the partition is not 
muted", 1, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
+
+        // Test batches in retry.
+        // Create a retried batch
+        accum.append(tp1, 0L, key, value, null, 0);
+        time.sleep(lingerMs);
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
+        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, 
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("There should be only one batch.", 
drained.get(node1.id()).size(), 1);
+        time.sleep(1000L);
+        accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
+
+        // test expiration.
+        time.sleep(requestTimeout + retryBackoffMs);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should not be expired.", 0, 
expiredBatches.size());
+        time.sleep(1L);
+
+        accum.mutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        assertEquals("The batch should be expired when the partition is not 
muted.", 1, expiredBatches.size());
     }
 
     @Test

Reply via email to