Sebastiaan created KAFKA-8412:
---------------------------------

             Summary: Still a nullpointer exception thrown on shutdown while 
flushing before closing producers
                 Key: KAFKA-8412
                 URL: https://issues.apache.org/jira/browse/KAFKA-8412
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.1.1
            Reporter: Sebastiaan


I found a closed issue and replied there but decided to open one myself because 
although they're related they're slightly different. The original issue is at 
https://issues.apache.org/jira/browse/KAFKA-7678

The fix there has been to implement a null check around closing a producer 
because in some cases the producer is already null there (has been closed 
already)

In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
method that is called pre-close. This is in the log:
{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

Followed by:
 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

If I look at the source code at this point, I see a nice null check in the 
close method, but not in the flush method that is called just before that:
{code:java}
public void flush() {
    this.log.debug("Flushing producer");
    this.producer.flush();
    this.checkForException();
}

public void close() {
    this.log.debug("Closing producer");
    if (this.producer != null) {
        this.producer.close();
        this.producer = null;
    }

    this.checkForException();
}{code}

Seems to my (ignorant) eye that the flush method should also be wrapped in a 
null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to