[ 
https://issues.apache.org/jira/browse/NIFI-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206593#comment-15206593
 ] 

Joseph Witt commented on NIFI-1665:
-----------------------------------

This looks to me to be quite important to have in 0.6.0 for kafka users.

> GetKafka continually throws NullPointerException if it ever fails to write 
> out message due to Kafka timeout
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-1665
>                 URL: https://issues.apache.org/jira/browse/NIFI-1665
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>            Priority: Critical
>             Fix For: 0.6.0
>
>
> If an Exception is thrown in GetKafka's consumeFromKafka method, it enters 
> the following block:
> {code}
>         catch (final Exception e) {
>             this.shutdownConsumer();
>             getLogger().error("Failed to receive FlowFile from Kafka due to 
> {}", new Object[]{e});
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
> {code}
> This call to shutdownConsumer performs the following:
> {code}
> if (this.executor != null) {
>             this.executor.shutdown();
>             try {
>                 if (!this.executor.awaitTermination(30000, 
> TimeUnit.MILLISECONDS)) {
>                     this.executor.shutdownNow();
>                     getLogger().warn("Executor did not stop in 30 sec. 
> Terminated.");
>                 }
>                 this.executor = null;
>             } catch (InterruptedException e) {
>                 Thread.currentThread().interrupt();
>             }
>         }
> {code}
> Now that this.executor is set to null, the onTrigger method will continually 
> throw NullPointerException because it attempts to call executor.submit:
> {code}
>         synchronized (this.consumerStreamsReady) {
>             if (!this.consumerStreamsReady.get()) {
>                 Future<Void> f = this.executor.submit(new Callable<Void>() {
> ...
> {code}
> and also
> {code}
>         if (this.consumerStreamsReady.get()) {
>             Future<Void> consumptionFuture = this.executor.submit(new 
> Callable<Void>() {
>                 @Override
>                 public Void call() throws Exception {
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to