Repository: kafka Updated Branches: refs/heads/trunk d60f011d7 -> 8f90fd653
KAFKA-5959; Fix NPE in Sender.canRetry when idempotence is not enabled Author: Apurva Mehta <apu...@confluent.io> Reviewers: tedyu <yuzhih...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #3947 from apurvam/KAFKA-5959-npe-in-sender Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f90fd65 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f90fd65 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f90fd65 Branch: refs/heads/trunk Commit: 8f90fd6530ce2c4f7e2fdfe3541c61d0178289d5 Parents: d60f011 Author: Apurva Mehta <apu...@confluent.io> Authored: Fri Sep 22 13:07:28 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Sep 22 13:07:59 2017 -0700 ---------------------------------------------------------------------- .../clients/producer/internals/Sender.java | 3 +- .../clients/producer/internals/SenderTest.java | 30 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8f90fd65/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d71046a..45a2919 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -636,7 +636,8 @@ public class Sender implements Runnable { */ private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) { return batch.attempts() < this.retries && - ((response.error.exception() instanceof RetriableException) || transactionManager.canRetry(response, batch)); + ((response.error.exception() instanceof RetriableException) || + (transactionManager != null && transactionManager.canRetry(response, batch))); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/8f90fd65/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e1ea10a..ecf77aa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -500,6 +501,35 @@ public class SenderTest { assertSendFailure(ClusterAuthorizationException.class); } + @Test + public void testCanRetryWithoutIdempotence() throws Exception { + // do a successful retry + Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + String id = client.requests().peek().destination(); + Node node = new Node(Integer.parseInt(id), "localhost", 0); + assertEquals(1, client.inFlightRequestCount()); + assertTrue(client.hasInFlightRequests()); + assertTrue("Client ready status should be true", client.isReady(node, 0L)); + assertFalse(future.isDone()); + + client.respond(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + ProduceRequest request = (ProduceRequest) body; + assertFalse(request.isIdempotent()); + return true; + } + }, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0)); + sender.run(time.milliseconds()); + assertTrue(future.isDone()); + try { + future.get(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof TopicAuthorizationException); + } + } @Test public void testIdempotenceWithMultipleInflights() throws Exception {