Repository: kafka
Updated Branches:
  refs/heads/trunk 1b964396b -> aef6927a4


KAFKA-4557; Handle Producer.send correctly in expiry callbacks

When iterating deque for expiring record batches, delay the
invocation of the callback and deallocation until iteration is
complete since callbacks invoked during expiry may send more
records, modifying the deque, resulting in a
ConcurrentModificationException in the iterator.

Author: Rajini Sivaram <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>

Closes #2449 from rajinisivaram/KAFKA-4557


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aef6927a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aef6927a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aef6927a

Branch: refs/heads/trunk
Commit: aef6927a42cde193dd6acc36cd4d9f32167da622
Parents: 1b96439
Author: Rajini Sivaram <[email protected]>
Authored: Fri Jan 27 23:26:09 2017 +0000
Committer: Ismael Juma <[email protected]>
Committed: Fri Jan 27 23:26:24 2017 +0000

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 13 ++++--
 .../clients/producer/internals/RecordBatch.java | 48 ++++++++++++--------
 .../internals/RecordAccumulatorTest.java        | 45 ++++++++++++++++++
 3 files changed, 85 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 06d39ec..d3ae89e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -243,12 +243,14 @@ public final class RecordAccumulator {
                     while (batchIterator.hasNext()) {
                         RecordBatch batch = batchIterator.next();
                         boolean isFull = batch != lastBatch || batch.isFull();
-                        // check if the batch is expired
+                        // Check if the batch has expired. Expired batches are 
closed by maybeExpire, but callbacks
+                        // are invoked after completing the iterations, since 
sends invoked from callbacks
+                        // may append more batches to the deque being 
iterated. The batch is deallocated after
+                        // callbacks are invoked.
                         if (batch.maybeExpire(requestTimeout, retryBackoffMs, 
now, this.lingerMs, isFull)) {
                             expiredBatches.add(batch);
                             count++;
                             batchIterator.remove();
-                            deallocate(batch);
                         } else {
                             // Stop at the first batch that has not expired.
                             break;
@@ -257,8 +259,13 @@ public final class RecordAccumulator {
                 }
             }
         }
-        if (!expiredBatches.isEmpty())
+        if (!expiredBatches.isEmpty()) {
             log.trace("Expired {} batches in accumulator", count);
+            for (RecordBatch batch : expiredBatches) {
+                batch.expirationDone();
+                deallocate(batch);
+            }
+        }
 
         return expiredBatches;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index c8eddd5..6346fe9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A batch of records that is or will be sent.
@@ -48,6 +49,8 @@ public final class RecordBatch {
     long drainedMs;
     long lastAttemptMs;
     long lastAppendTime;
+    private String expiryErrorMessage;
+    private AtomicBoolean completed;
     private boolean retry;
 
     public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, 
long now) {
@@ -57,6 +60,7 @@ public final class RecordBatch {
         this.topicPartition = tp;
         this.lastAppendTime = createdMs;
         this.produceFuture = new ProduceRequestResult(topicPartition);
+        this.completed = new AtomicBoolean();
     }
 
     /**
@@ -93,6 +97,9 @@ public final class RecordBatch {
         log.trace("Produced messages to topic-partition {} with base offset 
offset {} and error: {}.",
                   topicPartition, baseOffset, exception);
 
+        if (completed.getAndSet(true))
+            throw new IllegalStateException("Batch has already been 
completed");
+
         // Set the future before invoking the callbacks as we rely on its 
state for the `onCompletion` call
         produceFuture.set(baseOffset, logAppendTime, exception);
 
@@ -137,29 +144,34 @@ public final class RecordBatch {
      *     <li> the batch is not in retry AND request timeout has elapsed 
after it is ready (full or linger.ms has reached).
      *     <li> the batch is in retry AND request timeout has elapsed after 
the backoff period ended.
      * </ol>
+     * This methods closes this batch and sets {@code expiryErrorMessage} if 
the batch has timed out.
+     * {@link #expirationDone()} must be invoked to complete the produce 
future and invoke callbacks.
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long 
now, long lingerMs, boolean isFull) {
-        boolean expire = false;
-        String errorMessage = null;
-
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime)) {
-            expire = true;
-            errorMessage = (now - this.lastAppendTime) + " ms has passed since 
last append";
-        } else if (!this.inRetry() && requestTimeoutMs < (now - 
(this.createdMs + lingerMs))) {
-            expire = true;
-            errorMessage = (now - (this.createdMs + lingerMs)) + " ms has 
passed since batch creation plus linger time";
-        } else if (this.inRetry() && requestTimeoutMs < (now - 
(this.lastAttemptMs + retryBackoffMs))) {
-            expire = true;
-            errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " 
ms has passed since last attempt plus backoff time";
-        }
 
-        if (expire) {
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime))
+            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed 
since last append";
+        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs 
+ lingerMs)))
+            expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms 
has passed since batch creation plus linger time";
+        else if (this.inRetry() && requestTimeoutMs < (now - 
(this.lastAttemptMs + retryBackoffMs)))
+            expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) 
+ " ms has passed since last attempt plus backoff time";
+
+        boolean expired = expiryErrorMessage != null;
+        if (expired)
             close();
-            this.done(-1L, Record.NO_TIMESTAMP,
-                      new TimeoutException("Expiring " + recordCount + " 
record(s) for " + topicPartition + " due to " + errorMessage));
-        }
+        return expired;
+    }
 
-        return expire;
+    /**
+     * Completes the produce future with timeout exception and invokes 
callbacks.
+     * This method should be invoked only if {@link #maybeExpire(int, long, 
long, long, boolean)}
+     * returned true.
+     */
+    void expirationDone() {
+        if (expiryErrorMessage == null)
+            throw new IllegalStateException("Batch has not expired");
+        this.done(-1L, Record.NO_TIMESTAMP,
+                  new TimeoutException("Expiring " + recordCount + " record(s) 
for " + topicPartition + ": " + expiryErrorMessage));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 04e1411..f8bb1e9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -18,6 +18,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LogEntry;
@@ -38,10 +39,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -410,6 +413,48 @@ public class RecordAccumulatorTest {
     }
 
     @Test
+    public void testAppendInExpiryCallback() throws InterruptedException {
+        long retryBackoffMs = 100L;
+        long lingerMs = 3000L;
+        int requestTimeout = 60;
+        int messagesPerBatch = 1024 / msgSize;
+
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, lingerMs,
+                retryBackoffMs, metrics, time);
+        final AtomicInteger expiryCallbackCount = new AtomicInteger();
+        final AtomicReference<Exception> unexpectedException = new 
AtomicReference<Exception>();
+        Callback callback = new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
+                if (exception instanceof TimeoutException) {
+                    expiryCallbackCount.incrementAndGet();
+                    try {
+                        accum.append(tp1, 0L, key, value, null, 
maxBlockTimeMs);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Unexpected interruption", 
e);
+                    }
+                } else if (exception != null)
+                    unexpectedException.compareAndSet(null, exception);
+            }
+        };
+
+        for (int i = 0; i < messagesPerBatch + 1; i++)
+            accum.append(tp1, 0L, key, value, callback, maxBlockTimeMs);
+
+        assertEquals(2, accum.batches().get(tp1).size());
+        assertTrue("First batch not full", 
accum.batches().get(tp1).peekFirst().isFull());
+
+        // Advance the clock to expire the first batch.
+        time.sleep(requestTimeout + 1);
+        List<RecordBatch> expiredBatches = 
accum.abortExpiredBatches(requestTimeout, time.milliseconds());
+        assertEquals("The batch was not expired", 1, expiredBatches.size());
+        assertEquals("Callbacks not invoked for expiry", messagesPerBatch, 
expiryCallbackCount.get());
+        assertNull("Unexpected exception", unexpectedException.get());
+        assertEquals("Some messages not appended from expiry callbacks", 2, 
accum.batches().get(tp1).size());
+        assertTrue("First batch not full after expiry callbacks with appends", 
accum.batches().get(tp1).peekFirst().isFull());
+    }
+
+    @Test
     public void testMutedPartitions() throws InterruptedException {
         long now = time.milliseconds();
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, 10, 100L, metrics, time);

Reply via email to