Repository: kafka Updated Branches: refs/heads/trunk 1b964396b -> aef6927a4
KAFKA-4557; Handle Producer.send correctly in expiry callbacks When iterating deque for expiring record batches, delay the invocation of the callback and deallocation until iteration is complete since callbacks invoked during expiry may send more records, modifying the deque, resulting in a ConcurrentModificationException in the iterator. Author: Rajini Sivaram <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #2449 from rajinisivaram/KAFKA-4557 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aef6927a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aef6927a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aef6927a Branch: refs/heads/trunk Commit: aef6927a42cde193dd6acc36cd4d9f32167da622 Parents: 1b96439 Author: Rajini Sivaram <[email protected]> Authored: Fri Jan 27 23:26:09 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Fri Jan 27 23:26:24 2017 +0000 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 13 ++++-- .../clients/producer/internals/RecordBatch.java | 48 ++++++++++++-------- .../internals/RecordAccumulatorTest.java | 45 ++++++++++++++++++ 3 files changed, 85 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 06d39ec..d3ae89e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -243,12 +243,14 @@ public final class RecordAccumulator { while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); boolean isFull = batch != lastBatch || batch.isFull(); - // check if the batch is expired + // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks + // are invoked after completing the iterations, since sends invoked from callbacks + // may append more batches to the deque being iterated. The batch is deallocated after + // callbacks are invoked. if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { expiredBatches.add(batch); count++; batchIterator.remove(); - deallocate(batch); } else { // Stop at the first batch that has not expired. break; @@ -257,8 +259,13 @@ public final class RecordAccumulator { } } } - if (!expiredBatches.isEmpty()) + if (!expiredBatches.isEmpty()) { log.trace("Expired {} batches in accumulator", count); + for (RecordBatch batch : expiredBatches) { + batch.expirationDone(); + deallocate(batch); + } + } return expiredBatches; } http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index c8eddd5..6346fe9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * A batch of records that is or will be sent. @@ -48,6 +49,8 @@ public final class RecordBatch { long drainedMs; long lastAttemptMs; long lastAppendTime; + private String expiryErrorMessage; + private AtomicBoolean completed; private boolean retry; public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { @@ -57,6 +60,7 @@ public final class RecordBatch { this.topicPartition = tp; this.lastAppendTime = createdMs; this.produceFuture = new ProduceRequestResult(topicPartition); + this.completed = new AtomicBoolean(); } /** @@ -93,6 +97,9 @@ public final class RecordBatch { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); + if (completed.getAndSet(true)) + throw new IllegalStateException("Batch has already been completed"); + // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call produceFuture.set(baseOffset, logAppendTime, exception); @@ -137,29 +144,34 @@ public final class RecordBatch { * <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached). * <li> the batch is in retry AND request timeout has elapsed after the backoff period ended. * </ol> + * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out. + * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks. */ public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { - boolean expire = false; - String errorMessage = null; - - if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) { - expire = true; - errorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; - } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) { - expire = true; - errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time"; - } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) { - expire = true; - errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time"; - } - if (expire) { + if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) + expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; + else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) + expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time"; + else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) + expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time"; + + boolean expired = expiryErrorMessage != null; + if (expired) close(); - this.done(-1L, Record.NO_TIMESTAMP, - new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage)); - } + return expired; + } - return expire; + /** + * Completes the produce future with timeout exception and invokes callbacks. + * This method should be invoked only if {@link #maybeExpire(int, long, long, long, boolean)} + * returned true. + */ + void expirationDone() { + if (expiryErrorMessage == null) + throw new IllegalStateException("Batch has not expired"); + this.done(-1L, Record.NO_TIMESTAMP, + new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/aef6927a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 04e1411..f8bb1e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LogEntry; @@ -38,10 +39,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -410,6 +413,48 @@ public class RecordAccumulatorTest { } @Test + public void testAppendInExpiryCallback() throws InterruptedException { + long retryBackoffMs = 100L; + long lingerMs = 3000L; + int requestTimeout = 60; + int messagesPerBatch = 1024 / msgSize; + + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, + retryBackoffMs, metrics, time); + final AtomicInteger expiryCallbackCount = new AtomicInteger(); + final AtomicReference<Exception> unexpectedException = new AtomicReference<Exception>(); + Callback callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof TimeoutException) { + expiryCallbackCount.incrementAndGet(); + try { + accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); + } catch (InterruptedException e) { + throw new RuntimeException("Unexpected interruption", e); + } + } else if (exception != null) + unexpectedException.compareAndSet(null, exception); + } + }; + + for (int i = 0; i < messagesPerBatch + 1; i++) + accum.append(tp1, 0L, key, value, callback, maxBlockTimeMs); + + assertEquals(2, accum.batches().get(tp1).size()); + assertTrue("First batch not full", accum.batches().get(tp1).peekFirst().isFull()); + + // Advance the clock to expire the first batch. + time.sleep(requestTimeout + 1); + List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + assertEquals("The batch was not expired", 1, expiredBatches.size()); + assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get()); + assertNull("Unexpected exception", unexpectedException.get()); + assertEquals("Some messages not appended from expiry callbacks", 2, accum.batches().get(tp1).size()); + assertTrue("First batch not full after expiry callbacks with appends", accum.batches().get(tp1).peekFirst().isFull()); + } + + @Test public void testMutedPartitions() throws InterruptedException { long now = time.milliseconds(); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
