Repository: kafka
Updated Branches:
  refs/heads/1.0 ebeee3c8d -> aa1e4c235


KAFKA-6015; Fix NPE in RecordAccumulator after ProducerId reset

It is possible for batches with sequence numbers to be in the `deque` while at 
the same time the in flight batches in the `TransactionManager` are removed due 
to a producerId reset.

In this case, when the batches in the `deque` are drained, we will get a 
`NullPointerException` in the background thread due to this line:

```java
if (first.hasSequence() && first.baseSequence() != 
transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
```

Particularly, `transactionManager.nextBatchBySequence` will return null, 
because there no inflight batches being tracked.

In this patch, we simply allow the batches in the `deque` to be drained if 
there are no in flight batches being tracked in the TransactionManager. If they 
succeed, well and good. If the responses come back with an error, the batces 
will be ultimately failed in the producer with an `OutOfOrderSequenceException` 
when the response comes back.

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4022 from apurvam/KAFKA-6015-npe-in-record-accumulator

(cherry picked from commit 105ab47ed90c8a0e83c159c97a8f2294c5582657)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa1e4c23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa1e4c23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa1e4c23

Branch: refs/heads/1.0
Commit: aa1e4c235b882ae98a457cf9176f687951473cee
Parents: ebeee3c
Author: Apurva Mehta <apu...@confluent.io>
Authored: Fri Oct 6 15:01:22 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 6 15:04:39 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |   5 +-
 .../clients/producer/internals/Sender.java      |   4 +-
 .../producer/internals/TransactionManager.java  |  19 +++
 .../clients/producer/internals/SenderTest.java  | 116 +++++++++++++++++++
 .../scala/kafka/log/ProducerStateManager.scala  |  20 +++-
 5 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1e4c23/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 46cf6c4..ba8c28e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -538,8 +538,9 @@ public final class RecordAccumulator {
                                                 // on the client after being 
sent to the broker at least once.
                                                 break;
 
-                                            if (first.hasSequence()
-                                                    && first.baseSequence() != 
transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
+                                            int firstInFlightSequence = 
transactionManager.firstInFlightSequence(first.topicPartition);
+                                            if (firstInFlightSequence != 
RecordBatch.NO_SEQUENCE && first.hasSequence()
+                                                    && first.baseSequence() != 
firstInFlightSequence)
                                                 // If the queued batch already 
has an assigned sequence, then it is being
                                                 // retried. In this case, we 
wait until the next immediate batch is ready
                                                 // and drain that. We only 
move on when the next in line batch is complete (either successfully

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1e4c23/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b5780b..7eea499 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -528,8 +528,8 @@ public class Sender implements Runnable {
                 } else if 
(transactionManager.hasProducerIdAndEpoch(batch.producerId(), 
batch.producerEpoch())) {
                     // If idempotence is enabled only retry the request if the 
current producer id is the same as
                     // the producer id of the batch.
-                    log.debug("Retrying batch to topic-partition {}. Sequence 
number : {}", batch.topicPartition,
-                            batch.baseSequence());
+                    log.debug("Retrying batch to topic-partition {}. 
ProducerId: {}; Sequence number : {}",
+                            batch.topicPartition, batch.producerId(), 
batch.baseSequence());
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new 
OutOfOrderSequenceException("Attempted to retry sending a " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1e4c23/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index a0b45cd..006a12b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
@@ -435,6 +436,24 @@ public class TransactionManager {
         inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
     }
 
+    /**
+     * Returns the first inflight sequence for a given partition. This is the 
base sequence of an inflight batch with
+     * the lowest sequence number.
+     * @return the lowest inflight sequence if the transaction manager is 
tracking inflight requests for this partition.
+     *         If there are no inflight requests being tracked for this 
partition, this method will return
+     *         RecordBatch.NO_SEQUENCE.
+     */
+    synchronized int firstInFlightSequence(TopicPartition topicPartition) {
+        PriorityQueue<ProducerBatch> inFlightBatches = 
inflightBatchesBySequence.get(topicPartition);
+        if (inFlightBatches == null)
+            return RecordBatch.NO_SEQUENCE;
+
+        ProducerBatch firstInFlightBatch = inFlightBatches.peek();
+        if (firstInFlightBatch == null)
+            return RecordBatch.NO_SEQUENCE;
+
+        return firstInFlightBatch.baseSequence();
+    }
 
     synchronized ProducerBatch nextBatchBySequence(TopicPartition 
topicPartition) {
         PriorityQueue<ProducerBatch> queue = 
inflightBatchesBySequence.get(topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1e4c23/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index a32688b..1ce8e5a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -1105,6 +1105,103 @@ public class SenderTest {
     }
 
     @Test
+    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() 
throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new 
ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, 
apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, 
Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an 
OutOfOrderSequenceException", transactionManager.hasProducerId());
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertEquals(producerId + 1, 
transactionManager.producerIdAndEpoch().producerId);
+        sender.run(time.milliseconds());  // send request to tp1
+
+        assertFalse(successfulResponse.isDone());
+        client.respond(produceResponse(tp1, 10, Errors.NONE, -1));
+        sender.run(time.milliseconds());
+
+        assertTrue(successfulResponse.isDone());
+        assertEquals(10, successfulResponse.get().offset());
+
+        // Since the response came back for the old producer id, we shouldn't 
update the next sequence.
+        assertEquals(0, transactionManager.sequenceNumber(tp1).longValue());
+    }
+
+    @Test
+    public void 
testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry()
 throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new 
ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, 
apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, 
Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an 
OutOfOrderSequenceException", transactionManager.hasProducerId());
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertEquals(producerId + 1, 
transactionManager.producerIdAndEpoch().producerId);
+        sender.run(time.milliseconds());  // send request to tp1 with the old 
producerId
+
+        assertFalse(successfulResponse.isDone());
+        // The response comes back with a retriable error.
+        client.respond(produceResponse(tp1, 0, 
Errors.NOT_LEADER_FOR_PARTITION, -1));
+        sender.run(time.milliseconds());
+
+        assertTrue(successfulResponse.isDone());
+        // Since the batch has an old producerId, it will not be retried yet 
again, but will be failed with a Fatal
+        // exception.
+        try {
+            successfulResponse.get();
+            fail("Should have raised an OutOfOrderSequenceException");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
+        }
+    }
+
+    @Test
     public void testCorrectHandlingOfDuplicateSequenceError() throws Exception 
{
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1799,12 +1896,31 @@ public class SenderTest {
         };
     }
 
+    class OffsetAndError {
+        long offset;
+        Errors error;
+        OffsetAndError(long offset, Errors error) {
+            this.offset = offset;
+            this.error = error;
+        }
+    }
+
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs, long logStartOffset) {
         ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 
logStartOffset);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = 
Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private ProduceResponse produceResponse(Map<TopicPartition, 
OffsetAndError> responses) {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> partResponses = 
new LinkedHashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndError> entry : 
responses.entrySet()) {
+            ProduceResponse.PartitionResponse response = new 
ProduceResponse.PartitionResponse(entry.getValue().error,
+                    entry.getValue().offset, RecordBatch.NO_TIMESTAMP, -1);
+            partResponses.put(entry.getKey(), response);
+        }
+        return new ProduceResponse(partResponses);
+
+    }
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs) {
         return produceResponse(tp, offset, error, throttleTimeMs, -1L);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1e4c23/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 81726c1..7c0a3da 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -40,8 +40,22 @@ class CorruptSnapshotException(msg: String) extends 
KafkaException(msg)
 // ValidationType and its subtypes define the extent of the validation to 
perform on a given ProducerAppendInfo instance
 private[log] sealed trait ValidationType
 private[log] object ValidationType {
+
+  /**
+    * This indicates no validation should be performed on the incoming append. 
This is the case for all appends on
+    * a replica, as well as appends when the producer state is being built 
from the log.
+    */
   case object None extends ValidationType
+
+  /**
+    * We only validate the epoch (and not the sequence numbers) for offset 
commit requests coming from the transactional
+    * producer. These appends will not have sequence numbers, so we can't 
validate them.
+    */
   case object EpochOnly extends ValidationType
+
+  /**
+    * Perform the full validation. This should be used fo regular produce 
requests coming to the leader.
+    */
   case object Full extends ValidationType
 }
 
@@ -148,9 +162,9 @@ private[log] class ProducerIdEntry(val producerId: Long, 
val batchMetadata: muta
  *                      be made against the lastest append in the current 
entry. New appends will replace older appends
  *                      in the current entry so that the space overhead is 
constant.
  * @param validationType Indicates the extent of validation to perform on the 
appends on this instance. Offset commits
- *                       coming from the producer should have 
EpochOnlyValidation. Appends which aren't from a client
- *                       will not be validated at all, and should be set to 
NoValidation. All other appends should
- *                       have FullValidation.
+ *                       coming from the producer should have 
ValidationType.EpochOnly. Appends which aren't from a client
+ *                       should have ValidationType.None. Appends coming from 
a client for produce requests should have
+ *                       ValidationType.Full.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       currentEntry: ProducerIdEntry,

Reply via email to