[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906525#comment-16906525
 ] 

ASF GitHub Bot commented on KAFKA-8412:
---------------------------------------

cpettitt-confluent commented on pull request #7207: KAFKA-8412[WIP]: Still a 
nullpointer exception thrown on shutdown whi…
URL: https://github.com/apache/kafka/pull/7207
 
 
   …le flushing before closing producers
   
   Prior to this change an NPE is raised when calling AssignedTasks.close
   under the following conditions:
   
   1. EOS is enabled
   2. The task was in a suspended state
   
   The cause for the NPE is that when a clean close is requested for a
   StreamTask the StreamTask tries to commit. However, in the suspended
   state there is no producer so ultimately an NPE is thrown for the
   contained RecordCollector in flush.
   
   It is my opinion that in the long term, this (and probably other
   surprising state interactions) could be cleaned up by consolidating
   state into one place instead of spreading it across AssignedTasks,
   StreamTask, and AbstractTask. However, that is a much larger, more risky
   change, and this issue is currently considered minor.
   
   The fix put forth in this commit is to have AssignedTasks call
   closeSuspended when it knows the underlying StreamTask is suspended.
   
   Currently the only externally visible way to detect this problem in test
   seems to be via logging. This is because the NPE is logged but then
   suppressed under the following sequence:
   
   RecordCollectorImpl.flush:266
       - throws NPE (producer is null)
   
   StreamTask.suspend:578
       - goes through the finally block and then reraises the NPE
   
   StreamTask.close:706
       - catches the NPE, calls closeSuspended with the NPE
   
   StreamTask.closeSuspended:676
       - rethrows the NPE after some cleanup
   
   AssignedTasks.close:341
       - catches and logs the exception
       - tries a "dirty" close (clean = true) which succeeds
       - firstException is NOT set because the test `!closeUnclean(task)`
         does not hold.
   
   It seems this is not the intended behavior? If so, I will happily
   correct that and stop using logging as a way to detect failure.
   
   Otherwise this commit does not currently pass checkstyle because I'm
   using blacklisted imports: `LogCaptureAppender` and its various
   dependencies from `log4j`. I would appreciate guidance as to whether we
   should whitelist these or use another technique for detection.
   
   Note also that this test is quite involved. I could have just tested
   that AssignedTasks calls closeSuspended when appropriate, but that is
   testing, IMO, a detail of the implementation and doesn't actually verify
   we reproduced the original problem as it was described. I feel much more
   confident that we are reproducing the behavior - and we can test exactly
   the conditions that lead to it - when testing across AssignedTasks and
   StreamTask. I believe this is an additional support for the argument of
   eventually consolidating the state split across classes.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8412
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8412
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Sebastiaan
>            Assignee: Chris Pettitt
>            Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>         this.producer.close();
>         this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to