[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845824#comment-16845824 ] Sebastiaan commented on KAFKA-7678: --- Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is the full stacktrace: {code:java} message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758) {code} Followed by: {code:java} message: task [1_26] Could not close task due to the following error: logger_name: org.apache.kafka.streams.processor.internals.StreamTask java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that: {code:java} public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }{code} {color:#80} {color} Seems to my (ignorant) eye that the flush method should also be wrapped in a null check. > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdo
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701400#comment-16701400 ] Matthias J. Sax commented on KAFKA-7678: Are you saying that the shutdown still finished successfully and gracefully and the only issue if, that we get the NPE logged? If yes, I would consider this a minor bug and agree that a `null` check sounds like an appropriate fix. > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Priority: Major > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701647#comment-16701647 ] Jonathan Santilli commented on KAFKA-7678: -- Yes [~mjsax] , is like that, still finished successfully and gracefully. Now that we can say is a bug, I have changed to *minor* and assigned to myself. Hope to send the PR soon. Cheers! -- Jonathan > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Priority: Minor > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702239#comment-16702239 ] Matthias J. Sax commented on KAFKA-7678: Thank you! > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702652#comment-16702652 ] Guozhang Wang commented on KAFKA-7678: -- Looked at the source code, and I think it is indeed a bug: we may close a suspended task, and hence causing recordCollector.close() to be called multiple times, and hence that function should be implemented in an idempotent way. A more general thing is that internally the call trace from task-manager to tasks.close / suspend calls etc are quite messy, maybe some code cleanup would be worthy in the future. > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707879#comment-16707879 ] Jonathan Santilli commented on KAFKA-7678: -- PR submitted [~mjsax] [https://github.com/apache/kafka/pull/5993] Cheers! -- Jonathan > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Labels: bug > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710556#comment-16710556 ] ASF GitHub Bot commented on KAFKA-7678: --- mjsax closed pull request #5993: KAFKA-7678: Avoid NPE when closing the RecordCollector URL: https://github.com/apache/kafka/pull/5993 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 5df14ee2815..d3a00301d7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -249,8 +249,10 @@ public void flush() { @Override public void close() { log.debug("Closing producer"); -producer.close(); -producer = null; +if (producer != null) { +producer.close(); +producer = null; +} checkForException(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index e63751899f2..0bc65ccbe10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -387,6 +387,18 @@ public void testRecordHeaderPassThroughSerializer() { } } +@Test +public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() { +final RecordCollectorImpl collector = new RecordCollectorImpl( +"NoNPE", +logContext, +new DefaultProductionExceptionHandler(), +new Metrics().sensor("skipped-records") +); + +collector.close(); +} + private static class CustomStringSerializer extends StringSerializer { private boolean isKey; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Labels: bug > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consid
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714439#comment-16714439 ] Jonathan Santilli commented on KAFKA-7678: -- Hello [~guozhang] I can confirm the issue about calling the *.close()* method out of a null have been solved. However, I do not know the reason why that was happening (I did not dig enough into the code to understand the reason). Maybe we can create another Jira to explore it, what do you think? > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715220#comment-16715220 ] Guozhang Wang commented on KAFKA-7678: -- There are one or two edge cases which can cause record collector to be closed multiple times, we have noticed them recently and are thinking about cleanup the classes along the calling hierarchy (i.e. from Task Manager -> Task -> RecordCollector) for it. One example is: 1) a task is *suspended*, with EOS turned on (like your case), the record collector is closed(). 2) then the instance got killed (SIGTERM) , which causes all threads to be closed, which will then cause all their owned tasks to be *closed*. The same record collector close() call will be triggered again. > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)