GitHub user apurvam opened a pull request:

    https://github.com/apache/kafka/pull/4022

    KAFKA-6015: Fix NullPointerExceptionInRecordAccumulator

    It is possible for batches with sequence numbers to be in the `deque` while 
at the same time the in flight batches in the `TransactionManager` are removed 
due to a producerId reset.
    
    In this case, when the batches in the `deque` are drained, we will get a 
`NullPointerException` in the background thread due to this line: 
    
    ```java
    if (first.hasSequence() && first.baseSequence() != 
transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
    ```
    
    Particularly, `transactionManager.nextBatchBySequence` will return null, 
because there no inflight batches being tracked. 
    
    In this patch, we simply allow the batches in the `deque` to be drained if 
there are no in flight batches being tracked in the TransactionManager. If they 
succeed, well and good. If the responses come back with an error, the batces 
will be ultimately failed in the producer with an `OutOfOrderSequenceException` 
when the response comes back.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apurvam/kafka 
KAFKA-6015-npe-in-record-accumulator

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/4022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4022
    
----
commit 8d4ced9f42afb16113ed9464697b4d52394e1304
Author: Apurva Mehta <apu...@confluent.io>
Date:   2017-10-05T05:56:44Z

    Fix NPE in Accumulator

commit 18053f749d8b80bb878f7781f95cb02b58933f9f
Author: Apurva Mehta <apu...@confluent.io>
Date:   2017-10-05T06:34:47Z

    Add test case to ensure that we can send queued batches from a previous 
producer id after the producer state is reset

----


---

Reply via email to