Repository: kafka
Updated Branches:
  refs/heads/trunk d60f011d7 -> 8f90fd653


KAFKA-5959; Fix NPE in Sender.canRetry when idempotence is not enabled

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: tedyu <yuzhih...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3947 from apurvam/KAFKA-5959-npe-in-sender


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f90fd65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f90fd65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f90fd65

Branch: refs/heads/trunk
Commit: 8f90fd6530ce2c4f7e2fdfe3541c61d0178289d5
Parents: d60f011
Author: Apurva Mehta <apu...@confluent.io>
Authored: Fri Sep 22 13:07:28 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Sep 22 13:07:59 2017 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  3 +-
 .../clients/producer/internals/SenderTest.java  | 30 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f90fd65/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d71046a..45a2919 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -636,7 +636,8 @@ public class Sender implements Runnable {
      */
     private boolean canRetry(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
         return batch.attempts() < this.retries &&
-                ((response.error.exception() instanceof RetriableException) || 
transactionManager.canRetry(response, batch));
+                ((response.error.exception() instanceof RetriableException) ||
+                        (transactionManager != null && 
transactionManager.canRetry(response, batch)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f90fd65/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e1ea10a..ecf77aa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -33,6 +33,7 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -500,6 +501,35 @@ public class SenderTest {
         assertSendFailure(ClusterAuthorizationException.class);
     }
 
+    @Test
+    public void testCanRetryWithoutIdempotence() throws Exception {
+        // do a successful retry
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, 
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        String id = client.requests().peek().destination();
+        Node node = new Node(Integer.parseInt(id), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue(client.hasInFlightRequests());
+        assertTrue("Client ready status should be true", client.isReady(node, 
0L));
+        assertFalse(future.isDone());
+
+        client.respond(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                ProduceRequest request = (ProduceRequest) body;
+                assertFalse(request.isIdempotent());
+                return true;
+            }
+        }, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
+        sender.run(time.milliseconds());
+        assertTrue(future.isDone());
+        try {
+            future.get();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof TopicAuthorizationException);
+        }
+    }
 
     @Test
     public void testIdempotenceWithMultipleInflights() throws Exception {

Reply via email to