[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905459#comment-16905459
 ] 

Chris Pettitt commented on KAFKA-8412:
--------------------------------------

I'm able to repro this and [~mjsax]'s solution should fix this.

One other observation while I was in this code is that we essentially have 
state spread out across classes. As a dev new to the code, I would expect to 
push most of the state for the task down into StreamTask and only if we need 
fast lookup by state keep an index of these in AssignedTask, but treat this as 
an index and not as the authoritative state. In other words, I would prefer to 
see close on StreamTask do the right thing for the state it is in instead of 
having AssignedTasks be responsible for different types of close. This should 
also make it easier to test without involving mocks, as we are in AssignedTasks.

So I see a few options:
 # Use [~mjsax]'s solution and keep state as is.
 # Push state down into StreamTask, AssignedTasks just calls close as it does 
today.
 # Do #1 in first patch and follow up with #2 in a second patch focused on 
refactoring state.

[~mjsax] [~guozhang] thoughts?

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8412
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8412
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Sebastiaan
>            Assignee: Chris Pettitt
>            Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>         this.producer.close();
>         this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to