Repository: kafka Updated Branches: refs/heads/trunk 021d8a8e9 -> a86873be5
KAFKA-5957; Prevent second deallocate if response for aborted batch returns Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3942 from hachikuji/KAFKA-5957 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a86873be Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a86873be Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a86873be Branch: refs/heads/trunk Commit: a86873be50b3bf1585098a13b2ce710a717c0321 Parents: 021d8a8 Author: Jason Gustafson <[email protected]> Authored: Fri Sep 29 03:20:36 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri Sep 29 03:20:36 2017 +0100 ---------------------------------------------------------------------- .../producer/internals/ProducerBatch.java | 6 +- .../clients/producer/internals/Sender.java | 9 +- .../producer/internals/ProducerBatchTest.java | 44 +++-- .../clients/producer/internals/SenderTest.java | 161 ++++++++----------- 4 files changed, 112 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 93c843b..ea0f0f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -163,8 +163,9 @@ public final class ProducerBatch { * @param baseOffset The base offset of the messages assigned by the server * @param logAppendTime The log append time or -1 if CreateTime is being used * @param exception The exception that occurred (or null if the request was successful) + * @return true if the batch was completed successfully and false if the batch was previously aborted */ - public void done(long baseOffset, long logAppendTime, RuntimeException exception) { + public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { final FinalState finalState; if (exception == null) { log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset); @@ -177,13 +178,14 @@ public final class ProducerBatch { if (!this.finalState.compareAndSet(null, finalState)) { if (this.finalState.get() == FinalState.ABORTED) { log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); - return; + return false; } else { throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get()); } } completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); + return true; } private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index a15a250..8b5780b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -181,6 +181,7 @@ public class Sender implements Runnable { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. + log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { @@ -587,8 +588,8 @@ public class Sender implements Runnable { transactionManager.removeInFlightBatch(batch); } - batch.done(response.baseOffset, response.logAppendTime, null); - this.accumulator.deallocate(batch); + if (batch.done(response.baseOffset, response.logAppendTime, null)) + this.accumulator.deallocate(batch); } private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) { @@ -623,8 +624,8 @@ public class Sender implements Runnable { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); - batch.done(baseOffset, logAppendTime, exception); - this.accumulator.deallocate(batch); + if (batch.done(baseOffset, logAppendTime, exception)) + this.accumulator.deallocate(batch); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 41aa5c6..2f89d79 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -33,7 +34,6 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Deque; -import java.util.Iterator; import java.util.concurrent.ExecutionException; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; @@ -64,15 +64,20 @@ public class ProducerBatchTest { @Test public void testBatchAbort() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); KafkaException exception = new KafkaException(); batch.abort(exception); assertTrue(future.isDone()); + assertEquals(1, callback.invocations); + assertEquals(exception, callback.exception); + assertNull(callback.metadata); // subsequent completion should be ignored - batch.done(500L, 2342342341L, null); - batch.done(-1, -1, new KafkaException()); + assertFalse(batch.done(500L, 2342342341L, null)); + assertFalse(batch.done(-1, -1, new KafkaException())); + assertEquals(1, callback.invocations); assertTrue(future.isDone()); try { @@ -86,9 +91,13 @@ public class ProducerBatchTest { @Test public void testBatchCannotAbortTwice() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); KafkaException exception = new KafkaException(); batch.abort(exception); + assertEquals(1, callback.invocations); + assertEquals(exception, callback.exception); + assertNull(callback.metadata); try { batch.abort(new KafkaException()); @@ -97,6 +106,7 @@ public class ProducerBatchTest { // expected } + assertEquals(1, callback.invocations); assertTrue(future.isDone()); try { future.get(); @@ -109,8 +119,12 @@ public class ProducerBatchTest { @Test public void testBatchCannotCompleteTwice() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); batch.done(500L, 10L, null); + assertEquals(1, callback.invocations); + assertNull(callback.exception); + assertNotNull(callback.metadata); try { batch.done(1000L, 20L, null); @@ -166,9 +180,7 @@ public class ProducerBatchTest { for (ProducerBatch splitProducerBatch : batches) { for (RecordBatch splitBatch : splitProducerBatch.records().batches()) { - Iterator<Record> iter = splitBatch.iterator(); - while (iter.hasNext()) { - Record record = iter.next(); + for (Record record : splitBatch) { assertTrue("Header size should be 1.", record.headers().length == 1); assertTrue("Header key should be 'header-key'.", record.headers()[0].key().equals("header-key")); assertTrue("Header value should be 'header-value'.", new String(record.headers()[0].value()).equals("header-value")); @@ -260,4 +272,18 @@ public class ProducerBatchTest { assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10], Record.EMPTY_HEADERS)); assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS, null, now + 1)); } + + private static class MockCallback implements Callback { + private int invocations = 0; + private RecordMetadata metadata; + private Exception exception; + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + invocations++; + this.metadata = metadata; + this.exception = exception; + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 3a92e63..a32688b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; @@ -93,7 +94,6 @@ public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; - private static final int MAX_RETRIES = 0; private static final String CLIENT_ID = "clientId"; private static final double EPS = 0.0001; private static final int MAX_BLOCK_TIMEOUT = 1000; @@ -349,7 +349,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend } sender.run(time.milliseconds()); - completedWithError(future, Errors.NETWORK_EXCEPTION); + assertFutureFailure(future, NetworkException.class); } finally { m.close(); } @@ -713,14 +713,7 @@ public class SenderTest { sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L); sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches. - - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised an error"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof RecordTooLargeException); - } + assertFutureFailure(request1, RecordTooLargeException.class); assertEquals(1, client.inFlightRequestCount()); assertEquals(-1, transactionManager.lastAckedSequence(tp0)); @@ -787,14 +780,7 @@ public class SenderTest { sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L); sender.run(time.milliseconds()); - assertTrue(request2.isDone()); - - try { - request2.get(); - fail("Expected an OutOfOrderSequenceException"); - } catch (ExecutionException e) { - assert e.getCause() instanceof OutOfOrderSequenceException; - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); } @Test @@ -966,13 +952,7 @@ public class SenderTest { sender.run(time.milliseconds()); - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertFalse(transactionManager.hasUnresolvedSequence(tp0)); } @@ -1004,13 +984,7 @@ public class SenderTest { client.blackout(node, 10); sender.run(time.milliseconds()); // now expire the first batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -1068,13 +1042,7 @@ public class SenderTest { client.blackout(node, 10); sender.run(time.milliseconds()); // now expire the first batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -1086,14 +1054,7 @@ public class SenderTest { sender.run(time.milliseconds()); // send second request sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1); sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state. - assertTrue(request2.isDone()); - - try { - request2.get(); - fail("should have failed with an exception"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof OutOfOrderSequenceException); - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); Deque<ProducerBatch> batches = accumulator.batches().get(tp0); @@ -1132,13 +1093,7 @@ public class SenderTest { sender.run(time.milliseconds()); // now expire the batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); assertFalse(client.hasInFlightRequests()); Deque<ProducerBatch> batches = accumulator.batches().get(tp0); @@ -1444,14 +1399,7 @@ public class SenderTest { sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L); sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset - - assertTrue(request2.isDone()); - try { - request2.get(); - fail("Should have raised an OutOfOrderSequenceException"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof OutOfOrderSequenceException); - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); } void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) { @@ -1497,13 +1445,7 @@ public class SenderTest { }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised ClusterAuthorizationException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof ClusterAuthorizationException); - } + assertFutureFailure(future, ClusterAuthorizationException.class); // cluster authorization errors are fatal, so we should continue seeing it on future sends assertTrue(transactionManager.hasFatalError()); @@ -1511,6 +1453,49 @@ public class SenderTest { } @Test + public void testCancelInFlightRequestAfterFatalError() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + // cluster authorization is a fatal error for the producer + Future<RecordMetadata> future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + Future<RecordMetadata> future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + client.respond(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); + + sender.run(time.milliseconds()); + assertTrue(transactionManager.hasFatalError()); + assertFutureFailure(future1, ClusterAuthorizationException.class); + + sender.run(time.milliseconds()); + assertFutureFailure(future2, ClusterAuthorizationException.class); + + // Should be fine if the second response eventually returns + client.respond(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }, produceResponse(tp1, 0, Errors.NONE, 0)); + sender.run(time.milliseconds()); + } + + @Test public void testUnsupportedForMessageFormatInProduceRequest() throws Exception { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); @@ -1530,13 +1515,7 @@ public class SenderTest { }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0)); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised UnsupportedForMessageFormat"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof UnsupportedForMessageFormatException); - } + assertFutureFailure(future, UnsupportedForMessageFormatException.class); // unsupported for message format is not a fatal error assertFalse(transactionManager.hasError()); @@ -1562,13 +1541,7 @@ public class SenderTest { }); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised UnsupportedVersionException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof UnsupportedVersionException); - } + assertFutureFailure(future, UnsupportedVersionException.class); // unsupported version errors are fatal, so we should continue seeing it on future sends assertTrue(transactionManager.hasFatalError()); @@ -1826,16 +1799,6 @@ public class SenderTest { }; } - private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { - assertTrue("Request should be completed", future.isDone()); - try { - future.get(); - fail("Should have thrown an exception."); - } catch (ExecutionException e) { - assertEquals(error.exception().getClass(), e.getCause().getClass()); - } - } - private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); @@ -1906,4 +1869,16 @@ public class SenderTest { client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch)); } + private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType) + throws InterruptedException { + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised " + expectedExceptionType.getName()); + } catch (ExecutionException e) { + Class<? extends Throwable> causeType = e.getCause().getClass(); + assertTrue("Unexpected cause " + causeType.getName(), expectedExceptionType.isAssignableFrom(causeType)); + } + } + }
