[ https://issues.apache.org/jira/browse/NIFI-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206593#comment-15206593 ]
Joseph Witt commented on NIFI-1665: ----------------------------------- This looks to me to be quite important to have in 0.6.0 for kafka users. > GetKafka continually throws NullPointerException if it ever fails to write > out message due to Kafka timeout > ----------------------------------------------------------------------------------------------------------- > > Key: NIFI-1665 > URL: https://issues.apache.org/jira/browse/NIFI-1665 > Project: Apache NiFi > Issue Type: Bug > Reporter: Mark Payne > Assignee: Oleg Zhurakousky > Priority: Critical > Fix For: 0.6.0 > > > If an Exception is thrown in GetKafka's consumeFromKafka method, it enters > the following block: > {code} > catch (final Exception e) { > this.shutdownConsumer(); > getLogger().error("Failed to receive FlowFile from Kafka due to > {}", new Object[]{e}); > if (flowFile != null) { > session.remove(flowFile); > } > {code} > This call to shutdownConsumer performs the following: > {code} > if (this.executor != null) { > this.executor.shutdown(); > try { > if (!this.executor.awaitTermination(30000, > TimeUnit.MILLISECONDS)) { > this.executor.shutdownNow(); > getLogger().warn("Executor did not stop in 30 sec. > Terminated."); > } > this.executor = null; > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > } > } > {code} > Now that this.executor is set to null, the onTrigger method will continually > throw NullPointerException because it attempts to call executor.submit: > {code} > synchronized (this.consumerStreamsReady) { > if (!this.consumerStreamsReady.get()) { > Future<Void> f = this.executor.submit(new Callable<Void>() { > ... > {code} > and also > {code} > if (this.consumerStreamsReady.get()) { > Future<Void> consumptionFuture = this.executor.submit(new > Callable<Void>() { > @Override > public Void call() throws Exception { > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)