[
https://issues.apache.org/jira/browse/CAMEL-18327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
rupam updated CAMEL-18327:
--------------------------
Component/s: (was: camel-health)
Description:
In the startPolling() method, if the consumer is suspending, then the
unsubscribe method is called which permanently closes the consumer
{code:java}
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
while (isKafkaConsumerRunnable() && isConnected() &&
pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
if (consumerListener != null) {
if (!consumerListener.afterConsume(consumer)) {
continue;
}
}
ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords);
if (result.isBreakOnErrorHit()) {
LOG.debug("We hit an error ... setting flags to force reconnect");
// force re-connect
setReconnect(true);
setConnected(false);
}
updateTaskState();
}
if (!isConnected()) {
LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
commitManager.commit();
}
safeUnsubscribe();
} catch (InterruptException e) {{code}
was:
Investigate and fix the issue below:
{code}
1496 │ java.util.ConcurrentModificationException: KafkaConsumer is not safe
for multi-threaded access
1497 │ at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2469)
~[kafka-clients-3.1.0.jar:?]
1498 │ at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2453)
~[kafka-clients-3.1.0.jar:?]
1499 │ at
org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:891)
~[kafka-clients-3.1.0.jar:?]
1500 │ at
org.apache.camel.component.kafka.KafkaFetchRecords.pause(KafkaFetchRecords.java:518)
~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1501 │ at
org.apache.camel.component.kafka.KafkaConsumer.doSuspend(KafkaConsumer.java:221)
~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1502 │ at
org.apache.camel.support.service.BaseService.suspend(BaseService.java:189)
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1503 │ at
org.apache.camel.support.service.ServiceHelper.suspendService(ServiceHelper.java:404)
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1504 │ at
org.apache.camel.impl.engine.DefaultShutdownStrategy.suspendNow(DefaultShutdownStrategy.java:447)
~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1505 │ at
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:620)
~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1506 │ at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
1507 │ at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
1508 │ at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
1509 │ at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
1510 │ at java.lang.Thread.run(Thread.java:833) ~[?:?]
{code}
> camel-kafka: Kafka consumer closes when it is paused
> ----------------------------------------------------
>
> Key: CAMEL-18327
> URL: https://issues.apache.org/jira/browse/CAMEL-18327
> Project: Camel
> Issue Type: Task
> Components: camel-kafka
> Affects Versions: 3.17.0
> Reporter: rupam
> Priority: Major
>
>
> In the startPolling() method, if the consumer is suspending, then the
> unsubscribe method is called which permanently closes the consumer
>
> {code:java}
> Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
> while (isKafkaConsumerRunnable() && isConnected() &&
> pollExceptionStrategy.canContinue()) {
> ConsumerRecords<Object, Object> allRecords =
> consumer.poll(pollDuration);
> if (consumerListener != null) {
> if (!consumerListener.afterConsume(consumer)) {
> continue;
> }
> }
> ProcessingResult result =
> recordProcessorFacade.processPolledRecords(allRecords);
> if (result.isBreakOnErrorHit()) {
> LOG.debug("We hit an error ... setting flags to force reconnect");
> // force re-connect
> setReconnect(true);
> setConnected(false);
> }
> updateTaskState();
> }
> if (!isConnected()) {
> LOG.debug("Not reconnecting, check whether to auto-commit or not
> ...");
> commitManager.commit();
> }
> safeUnsubscribe();
> } catch (InterruptException e) {{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)