[ 
https://issues.apache.org/jira/browse/CAMEL-18327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rupam updated CAMEL-18327:
--------------------------
    Component/s:     (was: camel-health)
    Description: 
 

In the startPolling() method, if the consumer is suspending, then the 
unsubscribe method is called which permanently closes the consumer

 
{code:java}
    Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
    while (isKafkaConsumerRunnable() && isConnected() && 
pollExceptionStrategy.canContinue()) {
        ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
        if (consumerListener != null) {
            if (!consumerListener.afterConsume(consumer)) {
                continue;
            }
        }

        ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);

        if (result.isBreakOnErrorHit()) {
            LOG.debug("We hit an error ... setting flags to force reconnect");
            // force re-connect
            setReconnect(true);
            setConnected(false);
        }

        updateTaskState();
    }

    if (!isConnected()) {
        LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
        commitManager.commit();
    }

    safeUnsubscribe();
} catch (InterruptException e) {{code}
 

 

  was:
Investigate and fix the issue below: 

{code}
1496   │ java.util.ConcurrentModificationException: KafkaConsumer is not safe 
for multi-threaded access
1497   │     at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2469)
 ~[kafka-clients-3.1.0.jar:?]
1498   │     at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2453)
 ~[kafka-clients-3.1.0.jar:?]
1499   │     at 
org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:891)
 ~[kafka-clients-3.1.0.jar:?]
1500   │     at 
org.apache.camel.component.kafka.KafkaFetchRecords.pause(KafkaFetchRecords.java:518)
 ~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1501   │     at 
org.apache.camel.component.kafka.KafkaConsumer.doSuspend(KafkaConsumer.java:221)
 ~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1502   │     at 
org.apache.camel.support.service.BaseService.suspend(BaseService.java:189) 
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1503   │     at 
org.apache.camel.support.service.ServiceHelper.suspendService(ServiceHelper.java:404)
 ~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1504   │     at 
org.apache.camel.impl.engine.DefaultShutdownStrategy.suspendNow(DefaultShutdownStrategy.java:447)
 ~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1505   │     at 
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:620)
 ~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
1506   │     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
1507   │     at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
1508   │     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
1509   │     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
1510   │     at java.lang.Thread.run(Thread.java:833) ~[?:?]
{code}




> camel-kafka: Kafka consumer closes when it is paused
> ----------------------------------------------------
>
>                 Key: CAMEL-18327
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18327
>             Project: Camel
>          Issue Type: Task
>          Components: camel-kafka
>    Affects Versions: 3.17.0
>            Reporter: rupam
>            Priority: Major
>
>  
> In the startPolling() method, if the consumer is suspending, then the 
> unsubscribe method is called which permanently closes the consumer
>  
> {code:java}
>     Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
>     while (isKafkaConsumerRunnable() && isConnected() && 
> pollExceptionStrategy.canContinue()) {
>         ConsumerRecords<Object, Object> allRecords = 
> consumer.poll(pollDuration);
>         if (consumerListener != null) {
>             if (!consumerListener.afterConsume(consumer)) {
>                 continue;
>             }
>         }
>         ProcessingResult result = 
> recordProcessorFacade.processPolledRecords(allRecords);
>         if (result.isBreakOnErrorHit()) {
>             LOG.debug("We hit an error ... setting flags to force reconnect");
>             // force re-connect
>             setReconnect(true);
>             setConnected(false);
>         }
>         updateTaskState();
>     }
>     if (!isConnected()) {
>         LOG.debug("Not reconnecting, check whether to auto-commit or not 
> ...");
>         commitManager.commit();
>     }
>     safeUnsubscribe();
> } catch (InterruptException e) {{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to