This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d29fb108e46 camel-kafka: prevent exceptions in close from leaking 
(CAMEL-18796)
d29fb108e46 is described below

commit d29fb108e469c6eac77a5919ecbfef6475f1a205
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Dec 12 15:25:12 2022 +0100

    camel-kafka: prevent exceptions in close from leaking (CAMEL-18796)
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 660b7f278d8..0130f72fda4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -218,7 +218,11 @@ public class SingleNodeKafkaResumeStrategy implements 
KafkaResumeStrategy, Camel
         } finally {
             if (consumer != null) {
                 consumer.unsubscribe();
-                consumer.close(Duration.ofSeconds(5));
+                try {
+                    consumer.close(Duration.ofSeconds(5));
+                } catch (Exception e) {
+                    LOG.warn("Error closing the consumer: {} (this error will 
be ignored)", e.getMessage(), e);
+                }
             }
         }
     }
@@ -396,6 +400,8 @@ public class SingleNodeKafkaResumeStrategy implements 
KafkaResumeStrategy, Camel
             IOHelper.close(producer, "Kafka producer", LOG);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.warn("Error closing the Kafka producer: {} (this error will be 
ignored)", e.getMessage(), e);
         } finally {
             writeLock.unlock();
         }

Reply via email to