Repository: kafka Updated Branches: refs/heads/0.11.0 076eefcc6 -> 886a41d62
KAFKA-5429; Ignore produce response if batch was previously aborted Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #3300 from hachikuji/KAFKA-5429 (cherry picked from commit 6c92fc557682e853a0a3ed5684c13174fff45acf) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/886a41d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/886a41d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/886a41d6 Branch: refs/heads/0.11.0 Commit: 886a41d62453d1cd00b0b1eb575817f7a52fc371 Parents: 076eefc Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Jun 12 16:29:29 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Jun 12 16:32:34 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/ProducerBatch.java | 53 ++++++++++++---- .../producer/internals/RecordAccumulator.java | 8 +-- .../producer/internals/ProducerBatchTest.java | 67 ++++++++++++++++++++ 3 files changed, 111 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/886a41d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 7f3ba15..b33e080 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -41,13 +41,12 @@ import java.util.ArrayList; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; - /** * A batch of records that is or will be sent. * @@ -57,22 +56,24 @@ public final class ProducerBatch { private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class); + private enum FinalState { ABORTED, FAILED, SUCCEEDED }; + final long createdMs; final TopicPartition topicPartition; final ProduceRequestResult produceFuture; private final List<Thunk> thunks = new ArrayList<>(); private final MemoryRecordsBuilder recordsBuilder; - private final AtomicInteger attempts = new AtomicInteger(0); private final boolean isSplitBatch; + private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); + int recordCount; int maxRecordSize; private long lastAttemptMs; private long lastAppendTime; private long drainedMs; private String expiryErrorMessage; - private AtomicBoolean completed; private boolean retry; public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { @@ -86,7 +87,6 @@ public final class ProducerBatch { this.topicPartition = tp; this.lastAppendTime = createdMs; this.produceFuture = new ProduceRequestResult(topicPartition); - this.completed = new AtomicBoolean(); this.retry = false; this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), @@ -143,7 +143,20 @@ public final class ProducerBatch { } /** - * Complete the request. + * Abort the batch and complete the future and callbacks. + * + * @param exception The exception to use to complete the future and awaiting callbacks. + */ + public void abort(RuntimeException exception) { + if (!finalState.compareAndSet(null, FinalState.ABORTED)) + throw new IllegalStateException("Batch has already been completed in final state " + finalState.get()); + + log.trace("Aborting batch for partition {}", topicPartition, exception); + completeFutureAndFireCallbacks(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, exception); + } + + /** + * Complete the request. If the batch was previously aborted, this is a no-op. * * @param baseOffset The base offset of the messages assigned by the server * @param logAppendTime The log append time or -1 if CreateTime is being used @@ -152,10 +165,20 @@ public final class ProducerBatch { public void done(long baseOffset, long logAppendTime, RuntimeException exception) { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); + FinalState finalState = exception != null ? FinalState.FAILED : FinalState.SUCCEEDED; + if (!this.finalState.compareAndSet(null, finalState)) { + if (this.finalState.get() == FinalState.ABORTED) { + log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); + return; + } else { + throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get()); + } + } - if (completed.getAndSet(true)) - throw new IllegalStateException("Batch has already been completed"); + completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); + } + private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call produceFuture.set(baseOffset, logAppendTime, exception); @@ -275,7 +298,7 @@ public final class ProducerBatch { boolean expired = expiryErrorMessage != null; if (expired) - abort(); + abortRecordAppends(); return expired; } @@ -366,7 +389,14 @@ public final class ProducerBatch { } } - public void abort() { + /** + * Abort the record builder and reset the state of the underlying buffer. This is used prior to aborting + * the batch with {@link #abort(RuntimeException)} and ensures that no record previously appended can be + * read. This is used in scenarios where we want to ensure a batch ultimately gets aborted, but in which + * it is not safe to invoke the completion callbacks (e.g. because we are holding a lock, + * {@link RecordAccumulator#abortBatches()}). + */ + public void abortRecordAppends() { recordsBuilder.abort(); } @@ -390,9 +420,6 @@ public final class ProducerBatch { return recordsBuilder.magic(); } - /** - * Return the ProducerId (Pid) of the current batch. - */ public long producerId() { return recordsBuilder.producerId(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/886a41d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7237b6d..76cc6bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -614,10 +614,10 @@ public final class RecordAccumulator { for (ProducerBatch batch : incomplete.all()) { Deque<ProducerBatch> dq = getDeque(batch.topicPartition); synchronized (dq) { - batch.abort(); + batch.abortRecordAppends(); dq.remove(batch); } - batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason); + batch.abort(reason); deallocate(batch); } } @@ -629,12 +629,12 @@ public final class RecordAccumulator { synchronized (dq) { if (!batch.isClosed()) { aborted = true; - batch.abort(); + batch.abortRecordAppends(); dq.remove(batch); } } if (aborted) { - batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason); + batch.abort(reason); deallocate(batch); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/886a41d6/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 6d2d2f7..02989f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LegacyRecord; @@ -29,6 +31,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Deque; +import java.util.concurrent.ExecutionException; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; @@ -38,6 +41,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ProducerBatchTest { @@ -55,6 +59,69 @@ public class ProducerBatchTest { } @Test + public void testBatchAbort() throws Exception { + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + + KafkaException exception = new KafkaException(); + batch.abort(exception); + assertTrue(future.isDone()); + + // subsequent completion should be ignored + batch.done(500L, 2342342341L, null); + batch.done(-1, -1, new KafkaException()); + + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have thrown"); + } catch (ExecutionException e) { + assertEquals(exception, e.getCause()); + } + } + + @Test + public void testBatchCannotAbortTwice() throws Exception { + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + KafkaException exception = new KafkaException(); + batch.abort(exception); + + try { + batch.abort(new KafkaException()); + fail("Expected exception from abort"); + } catch (IllegalStateException e) { + // expected + } + + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have thrown"); + } catch (ExecutionException e) { + assertEquals(exception, e.getCause()); + } + } + + @Test + public void testBatchCannotCompleteTwice() throws Exception { + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + batch.done(500L, 10L, null); + + try { + batch.done(1000L, 20L, null); + fail("Expected exception from done"); + } catch (IllegalStateException e) { + // expected + } + + RecordMetadata recordMetadata = future.get(); + assertEquals(500L, recordMetadata.offset()); + assertEquals(10L, recordMetadata.timestamp()); + } + + @Test public void testAppendedChecksumMagicV0AndV1() { for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,