[
https://issues.apache.org/jira/browse/CAMEL-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360638#comment-15360638
]
ASF GitHub Bot commented on CAMEL-10115:
----------------------------------------
GitHub user anoordover opened a pull request:
https://github.com/apache/camel/pull/1057
CAMEL-10115: introduced pollTimeOutMs with default 30000
Please provide comment about two things:
- the default time of 30000ms for pollTimeOutMs;
- the assumption that the configuration of the endpoint (see line 50 in
KafkaConsumer) is never null.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/anoordover/camel CAMEL-10115
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/camel/pull/1057.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1057
----
commit 2ee1ed1f2ef86310e750d7ce86f786add0b4535a
Author: Arno Noordover <[email protected]>
Date: 2016-07-03T18:30:03Z
CAMEL-10115: introduced pollTimeOutMs with default 30000
----
> Kafka consumer stays running if no messages were received after shutdown start
> ------------------------------------------------------------------------------
>
> Key: CAMEL-10115
> URL: https://issues.apache.org/jira/browse/CAMEL-10115
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.17.1
> Environment: Spring Boot 1.4.0.M3 with Tomcat
> Java 8
> Camel 2.17.1
> Reporter: Vadym Chekrii
>
> After triggering {{CamelContext#close()}} method the execution will reach
> {{org.apache.camel.component.kafka.KafkaConsumer#doStop}} where the shutdown
> of the executor instance will be triggered and where in it's turn the
> interruption of the submitted to the executor threads should happen (by
> reaching the native implementation of Thread#interrupt())
> According to
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt--
> interrupt method will only set a corresponding status to the thread, but will
> not terminate it.
> Problem is in the line {{KafkaConsumer.java:108}}:
> {{ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);}}
> In the Kafka implementation of the poll method this will lead to almost
> infinite {{while}} loop which is not checking the thread status and this loop
> will exit only in case of receiving a message from a broker. Only after
> exciting the loop the interrupted status of the thread will be discovered and
> the thread will be terminated.
> This leads to a couple of problems:
> 1. The KafkaConsumers remain alive until receiving at least one more message
> from the broker.
> 2. As the CamelContext at this point of time is most likely already shut
> down, the received message is not going to be processed, but will be
> acknowledged to the broker. So effectively the message gets lost.
> A potential fix would be to either make the poll timeout reasonably small or
> configurable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)