This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch 
CAMEL-20199/remove-synchronized-blocks-from-j2m-components
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e954831d6f8e485890638b6803d67105701362ee
Author: Nicolas Filotto <nicolas.filo...@qlik.com>
AuthorDate: Thu Oct 31 15:23:55 2024 +0100

    CAMEL-20199: Remove synchronized block from components J to M
---
 .../camel/component/milvus/MilvusEndpoint.java     |  11 +-
 .../component/github/consumer/CommitConsumer.java  |  16 +-
 .../jackson/converter/JacksonTypeConverters.java   |  11 +-
 .../component/jasypt/JasyptPropertiesParser.java   |  49 +--
 .../converter/jaxb/FallbackTypeConverter.java      |  28 +-
 .../camel/component/jcache/JCacheManager.java      |  45 ++-
 .../apache/camel/component/jcr/JcrConsumer.java    | 132 ++++---
 .../camel/component/jetty/JettyHttpComponent.java  | 259 +++++++------
 .../apache/camel/component/jira/JiraEndpoint.java  |  76 ++--
 .../component/jms/EndpointMessageListener.java     |  17 +-
 .../apache/camel/component/jms/JmsComponent.java   |  19 +-
 .../apache/camel/component/jms/JmsProducer.java    |   5 +-
 .../component/jms/JmsTemporaryQueueEndpoint.java   |  13 +-
 .../component/jms/JmsTemporaryTopicEndpoint.java   |  13 +-
 .../component/jms/StreamMessageInputStream.java    |   8 +-
 .../jms/reply/MessageSelectorCreator.java          |  14 +-
 .../component/jms/reply/QueueReplyManager.java     |   5 +-
 .../jmx/JMXConsumerNotificationFilter.java         |  43 ++-
 .../apache/camel/component/jolt/JoltEndpoint.java  |  63 ++--
 .../apache/camel/component/jpa/JpaComponent.java   |  17 +-
 .../apache/camel/component/jslt/JsltEndpoint.java  |  92 ++---
 .../jsonvalidator/JsonValidatorEndpoint.java       |   5 +-
 .../java/org/apache/camel/jsonpath/JsonStream.java |   2 +-
 .../camel/component/jt400/Jt400Component.java      |  15 +-
 .../component/jt400/Jt400MsgQueueConsumer.java     |  73 ++--
 .../jta/JtaTransactionErrorHandlerReifier.java     |  43 ++-
 .../camel/component/kafka/KafkaFetchRecords.java   |   2 +-
 .../camel/component/kamelet/KameletComponent.java  |  27 +-
 .../camel/component/knative/KnativeComponent.java  |  26 +-
 .../camel/component/master/MasterConsumer.java     | 110 +++---
 .../component/mllp/MllpTcpClientProducer.java      | 413 +++++++++++----------
 .../mongodb/MongoDbTailTrackingManager.java        |  54 ++-
 .../camel/component/mybatis/MyBatisComponent.java  |  23 +-
 33 files changed, 982 insertions(+), 747 deletions(-)

diff --git 
a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
 
b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
index 0957ab82adb..c1f8146f63a 100644
--- 
a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
+++ 
b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
@@ -56,8 +56,6 @@ public class MilvusEndpoint extends DefaultEndpoint 
implements EndpointServiceLo
     @UriParam
     private MilvusConfiguration configuration;
 
-    private final Object lock;
-
     private volatile boolean closeClient;
     private volatile MilvusClient client;
 
@@ -71,8 +69,6 @@ public class MilvusEndpoint extends DefaultEndpoint 
implements EndpointServiceLo
 
         this.collection = collection;
         this.configuration = configuration;
-
-        this.lock = new Object();
     }
 
     @Override
@@ -93,9 +89,10 @@ public class MilvusEndpoint extends DefaultEndpoint 
implements EndpointServiceLo
         return collection;
     }
 
-    public synchronized MilvusClient getClient() {
+    public MilvusClient getClient() {
         if (this.client == null) {
-            synchronized (this.lock) {
+            lock.lock();
+            try {
                 if (this.client == null) {
                     this.client = this.configuration.getClient();
                     this.closeClient = false;
@@ -105,6 +102,8 @@ public class MilvusEndpoint extends DefaultEndpoint 
implements EndpointServiceLo
                         this.closeClient = true;
                     }
                 }
+            } finally {
+                lock.unlock();
             }
         }
 
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index 03bda8756e3..ec03b96ee36 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -75,7 +75,8 @@ public class CommitConsumer extends AbstractGitHubConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        synchronized (this) {
+        lock.lock();
+        try {
             super.doStart();
 
             // ensure we start from clean
@@ -97,24 +98,29 @@ public class CommitConsumer extends AbstractGitHubConsumer {
                 LOG.info("Starting from beginning");
             }
             started = true;
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        synchronized (this) {
+        lock.lock();
+        try {
             super.doStop();
 
             commitHashes.clear();
             lastSha = null;
             started = false;
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
     protected int poll() throws Exception {
-        synchronized (this) {
-
+        lock.lock();
+        try {
             if (!started) {
                 return 0;
             }
@@ -169,6 +175,8 @@ public class CommitConsumer extends AbstractGitHubConsumer {
             }
             LOG.debug("Last sha: {}", lastSha);
             return counter;
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
 
b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
index 4bd45abb1d1..3e419ecacc0 100644
--- 
a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
+++ 
b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory;
 public final class JacksonTypeConverters {
     private static final Logger LOG = 
LoggerFactory.getLogger(JacksonTypeConverters.class);
 
-    private final Object lock;
+    private final Lock lock;
     private volatile ObjectMapper defaultMapper;
 
     private boolean init;
@@ -66,7 +68,7 @@ public final class JacksonTypeConverters {
     private String moduleClassNames;
 
     public JacksonTypeConverters() {
-        this.lock = new Object();
+        this.lock = new ReentrantLock();
     }
 
     @Converter
@@ -306,7 +308,8 @@ public final class JacksonTypeConverters {
         }
 
         if (defaultMapper == null) {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 if (defaultMapper == null) {
                     ObjectMapper mapper = new ObjectMapper();
                     if (moduleClassNames != null) {
@@ -322,6 +325,8 @@ public final class JacksonTypeConverters {
 
                     defaultMapper = mapper;
                 }
+            } finally {
+                lock.unlock();
             }
         }
 
diff --git 
a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
 
b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
index 0dc7ddbec66..b95b9300822 100644
--- 
a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
+++ 
b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jasypt;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -43,15 +45,13 @@ public class JasyptPropertiesParser extends 
DefaultPropertiesParser {
             = JASYPT_PREFIX_TOKEN.replace("(", "\\(") + "(.+?)" + 
JASYPT_SUFFIX_TOKEN.replace(")", "\\)");
     private static final Pattern PATTERN = Pattern.compile(JASYPT_REGEX);
 
+    private final Lock lock = new ReentrantLock();
     private StringEncryptor encryptor;
     private String password;
     private String algorithm;
     private String randomSaltGeneratorAlgorithm;
     private String randomIvGeneratorAlgorithm;
 
-    public JasyptPropertiesParser() {
-    }
-
     @Override
     public String parseProperty(String key, String value, PropertiesLookup 
properties) {
         log.trace("Parsing property '{}={}'", key, value);
@@ -69,27 +69,32 @@ public class JasyptPropertiesParser extends 
DefaultPropertiesParser {
         return value;
     }
 
-    private synchronized void initEncryptor() {
-        if (encryptor == null) {
-            StringHelper.notEmpty("password", password);
-            StandardPBEStringEncryptor pbeStringEncryptor = new 
StandardPBEStringEncryptor();
-
-            pbeStringEncryptor.setPassword(password);
-            if (algorithm != null) {
-                pbeStringEncryptor.setAlgorithm(algorithm);
-                log.debug("Initialized encryptor using {} algorithm and 
provided password", algorithm);
-            } else {
-                log.debug("Initialized encryptor using default algorithm and 
provided password");
-            }
+    private void initEncryptor() {
+        lock.lock();
+        try {
+            if (encryptor == null) {
+                StringHelper.notEmpty("password", password);
+                StandardPBEStringEncryptor pbeStringEncryptor = new 
StandardPBEStringEncryptor();
+
+                pbeStringEncryptor.setPassword(password);
+                if (algorithm != null) {
+                    pbeStringEncryptor.setAlgorithm(algorithm);
+                    log.debug("Initialized encryptor using {} algorithm and 
provided password", algorithm);
+                } else {
+                    log.debug("Initialized encryptor using default algorithm 
and provided password");
+                }
 
-            if (randomSaltGeneratorAlgorithm != null) {
-                pbeStringEncryptor.setSaltGenerator(new 
RandomSaltGenerator(randomSaltGeneratorAlgorithm));
-            }
-            if (randomIvGeneratorAlgorithm != null) {
-                pbeStringEncryptor.setIvGenerator(new 
RandomIvGenerator(randomIvGeneratorAlgorithm));
-            }
+                if (randomSaltGeneratorAlgorithm != null) {
+                    pbeStringEncryptor.setSaltGenerator(new 
RandomSaltGenerator(randomSaltGeneratorAlgorithm));
+                }
+                if (randomIvGeneratorAlgorithm != null) {
+                    pbeStringEncryptor.setIvGenerator(new 
RandomIvGenerator(randomIvGeneratorAlgorithm));
+                }
 
-            encryptor = pbeStringEncryptor;
+                encryptor = pbeStringEncryptor;
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
 
b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
index e6dba9c4e90..1b8622e701d 100644
--- 
a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
+++ 
b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
@@ -28,6 +28,8 @@ import java.lang.reflect.AnnotatedElement;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import jakarta.xml.bind.JAXBContext;
 import jakarta.xml.bind.JAXBElement;
@@ -63,6 +65,7 @@ public class FallbackTypeConverter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FallbackTypeConverter.class);
 
+    private final Lock lock = new ReentrantLock();
     private final Map<AnnotatedElement, JAXBContext> contexts = new 
HashMap<>();
     private final StaxConverter staxConverter = new StaxConverter();
     private boolean defaultPrettyPrint = true;
@@ -318,19 +321,24 @@ public class FallbackTypeConverter {
         return exchange != null && 
exchange.getProperty(Exchange.FILTER_NON_XML_CHARS, Boolean.FALSE, 
Boolean.class);
     }
 
-    protected synchronized <T> JAXBContext createContext(Class<T> type) throws 
JAXBException {
+    protected <T> JAXBContext createContext(Class<T> type) throws 
JAXBException {
         AnnotatedElement ae = hasXmlRootElement(type) ? type : 
type.getPackage();
-        JAXBContext context = contexts.get(ae);
-        if (context == null) {
-            if (hasXmlRootElement(type)) {
-                context = JAXBContext.newInstance(type);
-                contexts.put(type, context);
-            } else {
-                context = JAXBContext.newInstance(type.getPackage().getName());
-                contexts.put(type.getPackage(), context);
+        lock.lock();
+        try {
+            JAXBContext context = contexts.get(ae);
+            if (context == null) {
+                if (hasXmlRootElement(type)) {
+                    context = JAXBContext.newInstance(type);
+                    contexts.put(type, context);
+                } else {
+                    context = 
JAXBContext.newInstance(type.getPackage().getName());
+                    contexts.put(type.getPackage(), context);
+                }
             }
+            return context;
+        } finally {
+            lock.unlock();
         }
-        return context;
     }
 
     protected <T> Unmarshaller getUnmarshaller(Class<T> type) throws 
JAXBException {
diff --git 
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
 
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
index 0b490cdcbbc..ebde348720a 100644
--- 
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
+++ 
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.cache.Cache;
 import javax.cache.CacheManager;
@@ -38,6 +40,7 @@ public class JCacheManager<K, V> implements Closeable {
     private final JCacheConfiguration configuration;
     private final String cacheName;
     private final CamelContext camelContext;
+    private final Lock lock = new ReentrantLock();
     private CachingProvider provider;
     private CacheManager manager;
     private Cache<K, V> cache;
@@ -68,29 +71,39 @@ public class JCacheManager<K, V> implements Closeable {
         return this.configuration;
     }
 
-    public synchronized Cache<K, V> getCache() throws Exception {
-        if (cache == null) {
-            JCacheProvider provider = 
JCacheProviders.lookup(configuration.getCachingProvider());
-            cache = doGetCache(provider);
-        }
+    public Cache<K, V> getCache() throws Exception {
+        lock.lock();
+        try {
+            if (cache == null) {
+                JCacheProvider provider = 
JCacheProviders.lookup(configuration.getCachingProvider());
+                cache = doGetCache(provider);
+            }
 
-        return cache;
+            return cache;
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    public synchronized void close() throws IOException {
-        if (configuration != null) {
-            if (cache != null) {
-                cache.close();
-            }
+    public void close() throws IOException {
+        lock.lock();
+        try {
+            if (configuration != null) {
+                if (cache != null) {
+                    cache.close();
+                }
 
-            if (manager != null) {
-                manager.close();
-            }
+                if (manager != null) {
+                    manager.close();
+                }
 
-            if (provider != null) {
-                provider.close();
+                if (provider != null) {
+                    provider.close();
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
index 2723004465e..63d628337de 100644
--- 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
+++ 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
@@ -63,86 +63,97 @@ public class JcrConsumer extends DefaultConsumer {
         return (JcrEndpoint) getEndpoint();
     }
 
-    private synchronized void createSessionAndRegisterListener() throws 
RepositoryException {
-        LOG.trace("createSessionAndRegisterListener START");
-
-        if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) {
-            session = 
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
-        } else {
-            session = 
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(),
-                    getJcrEndpoint().getWorkspaceName());
-        }
+    private void createSessionAndRegisterListener() throws RepositoryException 
{
+        lock.lock();
+        try {
+            LOG.trace("createSessionAndRegisterListener START");
+
+            if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) {
+                session = 
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
+            } else {
+                session = 
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(),
+                        getJcrEndpoint().getWorkspaceName());
+            }
 
-        int eventTypes = getJcrEndpoint().getEventTypes();
-        String absPath = getJcrEndpoint().getBase();
+            int eventTypes = getJcrEndpoint().getEventTypes();
+            String absPath = getJcrEndpoint().getBase();
 
-        if (absPath == null) {
-            absPath = "/";
-        } else if (!absPath.startsWith("/")) {
-            absPath = "/" + absPath;
-        }
+            if (absPath == null) {
+                absPath = "/";
+            } else if (!absPath.startsWith("/")) {
+                absPath = "/" + absPath;
+            }
 
-        boolean isDeep = getJcrEndpoint().isDeep();
-        String[] uuid = null;
-        String uuids = getJcrEndpoint().getUuids();
+            boolean isDeep = getJcrEndpoint().isDeep();
+            String[] uuid = null;
+            String uuids = getJcrEndpoint().getUuids();
 
-        if (uuids != null) {
-            uuids = uuids.trim();
+            if (uuids != null) {
+                uuids = uuids.trim();
 
-            if (!uuids.isEmpty()) {
-                uuid = uuids.split(",");
+                if (!uuids.isEmpty()) {
+                    uuid = uuids.split(",");
+                }
             }
-        }
 
-        String[] nodeTypeName = null;
-        String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
+            String[] nodeTypeName = null;
+            String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
 
-        if (nodeTypeNames != null) {
-            nodeTypeNames = nodeTypeNames.trim();
+            if (nodeTypeNames != null) {
+                nodeTypeNames = nodeTypeNames.trim();
 
-            if (!nodeTypeNames.isEmpty()) {
-                nodeTypeName = nodeTypeNames.split(",");
+                if (!nodeTypeNames.isEmpty()) {
+                    nodeTypeName = nodeTypeNames.split(",");
+                }
             }
-        }
 
-        boolean noLocal = getJcrEndpoint().isNoLocal();
+            boolean noLocal = getJcrEndpoint().isNoLocal();
 
-        eventListener = new EndpointEventListener(this, getJcrEndpoint(), 
getProcessor());
+            eventListener = new EndpointEventListener(this, getJcrEndpoint(), 
getProcessor());
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={}, 
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
-                    eventListener, absPath, eventTypes, isDeep, 
Arrays.toString(uuid), Arrays.toString(nodeTypeName),
-                    noLocal);
-        }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Adding JCR Event Listener, {}, on {}. eventTypes={}, 
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
+                        eventListener, absPath, eventTypes, isDeep, 
Arrays.toString(uuid), Arrays.toString(nodeTypeName),
+                        noLocal);
+            }
 
-        session.getWorkspace().getObservationManager()
-                .addEventListener(eventListener, eventTypes, absPath, isDeep, 
uuid, nodeTypeName, noLocal);
+            session.getWorkspace().getObservationManager()
+                    .addEventListener(eventListener, eventTypes, absPath, 
isDeep, uuid, nodeTypeName, noLocal);
 
-        LOG.trace("createSessionAndRegisterListener END");
+            LOG.trace("createSessionAndRegisterListener END");
+        } finally {
+            lock.unlock();
+        }
     }
 
-    private synchronized void unregisterListenerAndLogoutSession() throws 
RepositoryException {
-        LOG.trace("unregisterListenerAndLogoutSession START");
+    private void unregisterListenerAndLogoutSession() throws 
RepositoryException {
+        lock.lock();
+        try {
+            LOG.trace("unregisterListenerAndLogoutSession START");
 
-        if (session != null) {
-            try {
-                if (!session.isLive()) {
-                    LOG.info("Session was is no more live.");
-                } else {
-                    if (eventListener != null) {
-                        
session.getWorkspace().getObservationManager().removeEventListener(eventListener);
-                        eventListener = null;
+            if (session != null) {
+                try {
+                    if (!session.isLive()) {
+                        LOG.info("Session was is no more live.");
+                    } else {
+                        if (eventListener != null) {
+                            
session.getWorkspace().getObservationManager().removeEventListener(eventListener);
+                            eventListener = null;
+                        }
+
+                        session.logout();
                     }
-
-                    session.logout();
+                } finally {
+                    eventListener = null;
+                    session = null;
                 }
-            } finally {
-                eventListener = null;
-                session = null;
             }
-        }
 
-        LOG.trace("unregisterListenerAndLogoutSession END");
+            LOG.trace("unregisterListenerAndLogoutSession END");
+        } finally {
+            lock.unlock();
+        }
     }
 
     private void cancelSessionListenerChecker() {
@@ -170,7 +181,8 @@ public class JcrConsumer extends DefaultConsumer {
 
             boolean isSessionLive = false;
 
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (JcrConsumer.this.session != null) {
                     try {
                         isSessionLive = JcrConsumer.this.session.isLive();
@@ -178,6 +190,8 @@ public class JcrConsumer extends DefaultConsumer {
                         LOG.debug("Exception while checking jcr session", e);
                     }
                 }
+            } finally {
+                lock.unlock();
             }
 
             if (!isSessionLive) {
diff --git 
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
index 4bf8293d7db..baa13924e82 100644
--- 
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
+++ 
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
@@ -28,6 +28,7 @@ import java.util.EventListener;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import jakarta.servlet.Filter;
 import jakarta.servlet.MultipartConfigElement;
@@ -90,7 +91,7 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         implements RestConsumerFactory, RestApiConsumerFactory, 
SSLContextParametersAware {
     public static final String TMP_DIR = "CamelJettyTempDir";
 
-    protected static final HashMap<String, ConnectorRef> CONNECTORS = new 
HashMap<>();
+    protected static final Map<String, ConnectorRef> CONNECTORS = new 
ConcurrentHashMap<>();
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JettyHttpComponent.class);
     private static final String JETTY_SSL_KEYSTORE = 
"org.eclipse.jetty.ssl.keystore";
@@ -282,20 +283,18 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         JettyHttpEndpoint endpoint = (JettyHttpEndpoint) 
consumer.getEndpoint();
         String connectorKey = getConnectorKey(endpoint);
 
-        synchronized (CONNECTORS) {
-            ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
-
-            // check if there are already another consumer on the same 
context-path and if so fail
-            if (connectorRef != null) {
-                for (Map.Entry<String, HttpConsumer> entry : 
connectorRef.servlet.getConsumers().entrySet()) {
-                    String path = entry.getValue().getPath();
-                    CamelContext camelContext = 
entry.getValue().getEndpoint().getCamelContext();
-                    if (consumer.getPath().equals(path)) {
-                        // its allowed if they are from the same camel context
-                        boolean sameContext = 
consumer.getEndpoint().getCamelContext() == camelContext;
-                        if (!sameContext) {
-                            return false;
-                        }
+        ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
+
+        // check if there are already another consumer on the same 
context-path and if so fail
+        if (connectorRef != null) {
+            for (Map.Entry<String, HttpConsumer> entry : 
connectorRef.servlet.getConsumers().entrySet()) {
+                String path = entry.getValue().getPath();
+                CamelContext camelContext = 
entry.getValue().getEndpoint().getCamelContext();
+                if (consumer.getPath().equals(path)) {
+                    // its allowed if they are from the same camel context
+                    boolean sameContext = 
consumer.getEndpoint().getCamelContext() == camelContext;
+                    if (!sameContext) {
+                        return false;
                     }
                 }
             }
@@ -313,68 +312,77 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         JettyHttpEndpoint endpoint = (JettyHttpEndpoint) 
consumer.getEndpoint();
         String connectorKey = getConnectorKey(endpoint);
 
-        synchronized (CONNECTORS) {
-            ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
-            if (connectorRef == null) {
-                Server server = createServer();
-                Connector connector = getConnector(server, endpoint);
-                if 
("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost())) {
-                    LOG.warn("You use localhost interface! It means that no 
external connections will be available. "
-                             + "Don't you want to use 0.0.0.0 instead (all 
network interfaces)? {}",
-                            endpoint);
-                }
-                if (endpoint.isEnableJmx()) {
-                    enableJmx(server);
-                }
-                server.addConnector(connector);
-
-                connectorRef = new ConnectorRef(
-                        server, connector,
-                        createServletForConnector(server, connector, 
endpoint.getHandlers(), endpoint));
-                // must enable session before we start
-                if (endpoint.isSessionSupport()) {
-                    enableSessionSupport(connectorRef.server, connectorKey);
-                }
-                connectorRef.server.start();
+        CONNECTORS.compute(connectorKey, (cKey, connectorRef) -> {
+            try {
+                return connect(consumer, endpoint, cKey, connectorRef);
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
+        });
+    }
 
-                LOG.debug("Adding connector key: {} -> {}", connectorKey, 
connectorRef);
-                CONNECTORS.put(connectorKey, connectorRef);
+    private ConnectorRef connect(
+            HttpConsumer consumer, JettyHttpEndpoint endpoint, String 
connectorKey, ConnectorRef connectorRef)
+            throws Exception {
+        if (connectorRef == null) {
+            Server server = createServer();
+            Connector connector = getConnector(server, endpoint);
+            if ("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost())) 
{
+                LOG.warn("You use localhost interface! It means that no 
external connections will be available. "
+                         + "Don't you want to use 0.0.0.0 instead (all network 
interfaces)? {}",
+                        endpoint);
+            }
+            if (endpoint.isEnableJmx()) {
+                enableJmx(server);
+            }
+            server.addConnector(connector);
+
+            connectorRef = new ConnectorRef(
+                    server, connector,
+                    createServletForConnector(server, connector, 
endpoint.getHandlers(), endpoint));
+            // must enable session before we start
+            if (endpoint.isSessionSupport()) {
+                enableSessionSupport(connectorRef.server, connectorKey);
+            }
+            connectorRef.server.start();
 
-            } else {
-                LOG.debug("Using existing connector key: {} -> {}", 
connectorKey, connectorRef);
+            LOG.debug("Adding connector key: {} -> {}", connectorKey, 
connectorRef);
+        } else {
+            LOG.debug("Using existing connector key: {} -> {}", connectorKey, 
connectorRef);
 
-                // check if there are any new handlers, and if so then we need 
to re-start the server
-                if (endpoint.getHandlers() != null && 
!endpoint.getHandlers().isEmpty()) {
-                    List<Handler> existingHandlers = new ArrayList<>();
-                    if (connectorRef.server.getHandlers() != null && 
!connectorRef.server.getHandlers().isEmpty()) {
-                        existingHandlers = connectorRef.server.getHandlers();
-                    }
-                    List<Handler> newHandlers = new 
ArrayList<>(endpoint.getHandlers());
-                    boolean changed = 
!existingHandlers.containsAll(newHandlers) && 
!newHandlers.containsAll(existingHandlers);
-                    if (changed) {
-                        LOG.debug("Restarting Jetty server due to adding new 
Jetty Handlers: {}", newHandlers);
-                        connectorRef.server.stop();
-                        addJettyHandlers(connectorRef.server, 
endpoint.getHandlers());
-                        connectorRef.server.start();
-                    }
+            // check if there are any new handlers, and if so then we need to 
re-start the server
+            if (endpoint.getHandlers() != null && 
!endpoint.getHandlers().isEmpty()) {
+                List<Handler> existingHandlers = new ArrayList<>();
+                if (connectorRef.server.getHandlers() != null && 
!connectorRef.server.getHandlers().isEmpty()) {
+                    existingHandlers = connectorRef.server.getHandlers();
                 }
-                // check the session support
-                if (endpoint.isSessionSupport()) {
-                    enableSessionSupport(connectorRef.server, connectorKey);
+                List<Handler> newHandlers = new 
ArrayList<>(endpoint.getHandlers());
+                boolean changed = !existingHandlers.containsAll(newHandlers) 
&& !newHandlers.containsAll(existingHandlers);
+                if (changed) {
+                    LOG.debug("Restarting Jetty server due to adding new Jetty 
Handlers: {}", newHandlers);
+                    connectorRef.server.stop();
+                    addJettyHandlers(connectorRef.server, 
endpoint.getHandlers());
+                    connectorRef.server.start();
                 }
-                // ref track the connector
-                connectorRef.increment();
             }
-
-            if (endpoint.isEnableMultipartFilter()) {
-                enableMultipartFilter(endpoint, connectorRef.server);
+            // check the session support
+            if (endpoint.isSessionSupport()) {
+                enableSessionSupport(connectorRef.server, connectorKey);
             }
+            // ref track the connector
+            connectorRef.increment();
+        }
 
-            if (endpoint.getFilters() != null && 
!endpoint.getFilters().isEmpty()) {
-                setFilters(endpoint, connectorRef.server);
-            }
-            connectorRef.servlet.connect(consumer);
+        if (endpoint.isEnableMultipartFilter()) {
+            enableMultipartFilter(endpoint, connectorRef.server);
         }
+
+        if (endpoint.getFilters() != null && !endpoint.getFilters().isEmpty()) 
{
+            setFilters(endpoint, connectorRef.server);
+        }
+        connectorRef.servlet.connect(consumer);
+
+        return connectorRef;
     }
 
     private void enableJmx(Server server) {
@@ -388,7 +396,7 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         }
     }
 
-    private void enableSessionSupport(Server server, String connectorKey) 
throws Exception {
+    private void enableSessionSupport(Server server, String connectorKey) {
         ServletContextHandler context = 
server.getDescendant(ServletContextHandler.class);
         if (context.getSessionHandler() == null) {
             SessionHandler sessionHandler = new SessionHandler();
@@ -469,33 +477,39 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         HttpCommonEndpoint endpoint = consumer.getEndpoint();
         String connectorKey = getConnectorKey(endpoint);
 
-        synchronized (CONNECTORS) {
-            ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
-            if (connectorRef != null) {
-                connectorRef.servlet.disconnect(consumer);
-                if (connectorRef.decrement() == 0) {
-                    
connectorRef.server.removeConnector(connectorRef.connector);
-                    connectorRef.connector.stop();
-                    connectorRef.server.stop();
-                    CONNECTORS.remove(connectorKey);
-                    // Camel controls the lifecycle of these entities so 
remove the
-                    // registered MBeans when Camel is done with the managed 
objects.
-                    if (mbContainer != null) {
-                        this.removeServerMBean(connectorRef.server);
-                        //mbContainer.removeBean(connectorRef.connector);
-                    }
-                    if (defaultQueuedThreadPool != null) {
-                        try {
-                            defaultQueuedThreadPool.stop();
-                        } catch (Exception t) {
-                            defaultQueuedThreadPool.destroy();
-                        } finally {
-                            defaultQueuedThreadPool = null;
-                        }
-                    }
+        CONNECTORS.computeIfPresent(connectorKey, (cKey, connectorRef) -> {
+            try {
+                return disconnect(consumer, connectorRef);
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
+        });
+    }
+
+    private ConnectorRef disconnect(HttpConsumer consumer, ConnectorRef 
connectorRef) throws Exception {
+        connectorRef.servlet.disconnect(consumer);
+        if (connectorRef.decrement() == 0) {
+            connectorRef.server.removeConnector(connectorRef.connector);
+            connectorRef.connector.stop();
+            connectorRef.server.stop();
+            // Camel controls the lifecycle of these entities so remove the
+            // registered MBeans when Camel is done with the managed objects.
+            if (mbContainer != null) {
+                this.removeServerMBean(connectorRef.server);
+                //mbContainer.removeBean(connectorRef.connector);
+            }
+            if (defaultQueuedThreadPool != null) {
+                try {
+                    defaultQueuedThreadPool.stop();
+                } catch (Exception t) {
+                    defaultQueuedThreadPool.destroy();
+                } finally {
+                    defaultQueuedThreadPool = null;
                 }
             }
+            return null;
         }
+        return connectorRef;
     }
 
     private String getConnectorKey(HttpCommonEndpoint endpoint) {
@@ -808,25 +822,30 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
         throw new IllegalArgumentException("Jetty component does not use 
HttpConfiguration.");
     }
 
-    public synchronized MBeanContainer getMbContainer() {
-        // If null, provide the default implementation.
-        if (mbContainer == null) {
-            MBeanServer mbs = null;
+    public MBeanContainer getMbContainer() {
+        lock.lock();
+        try {
+            // If null, provide the default implementation.
+            if (mbContainer == null) {
+                MBeanServer mbs = null;
+
+                final ManagementStrategy mStrategy = 
this.getCamelContext().getManagementStrategy();
+                final ManagementAgent mAgent = mStrategy.getManagementAgent();
+                if (mAgent != null) {
+                    mbs = mAgent.getMBeanServer();
+                }
 
-            final ManagementStrategy mStrategy = 
this.getCamelContext().getManagementStrategy();
-            final ManagementAgent mAgent = mStrategy.getManagementAgent();
-            if (mAgent != null) {
-                mbs = mAgent.getMBeanServer();
+                if (mbs != null) {
+                    mbContainer = new MBeanContainer(mbs);
+                } else {
+                    LOG.warn("JMX disabled in CamelContext. Jetty JMX 
extensions will remain disabled.");
+                }
             }
 
-            if (mbs != null) {
-                mbContainer = new MBeanContainer(mbs);
-            } else {
-                LOG.warn("JMX disabled in CamelContext. Jetty JMX extensions 
will remain disabled.");
-            }
+            return this.mbContainer;
+        } finally {
+            lock.unlock();
         }
-
-        return this.mbContainer;
     }
 
     /**
@@ -1344,19 +1363,17 @@ public abstract class JettyHttpComponent extends 
HttpCommonComponent
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (!CONNECTORS.isEmpty()) {
-            for (Map.Entry<String, ConnectorRef> connectorEntry : 
CONNECTORS.entrySet()) {
-                ConnectorRef connectorRef = connectorEntry.getValue();
-                if (connectorRef != null && connectorRef.getRefCount() == 0) {
-                    
connectorRef.server.removeConnector(connectorRef.connector);
-                    connectorRef.connector.stop();
-                    // Camel controls the lifecycle of these entities so 
remove the
-                    // registered MBeans when Camel is done with the managed 
objects.
-                    removeServerMBean(connectorRef.server);
-                    connectorRef.server.stop();
-                    //removeServerMBean(connectorRef.connector);
-                    CONNECTORS.remove(connectorEntry.getKey());
-                }
+        for (Map.Entry<String, ConnectorRef> connectorEntry : 
CONNECTORS.entrySet()) {
+            ConnectorRef connectorRef = connectorEntry.getValue();
+            if (connectorRef != null && connectorRef.getRefCount() == 0) {
+                connectorRef.server.removeConnector(connectorRef.connector);
+                connectorRef.connector.stop();
+                // Camel controls the lifecycle of these entities so remove the
+                // registered MBeans when Camel is done with the managed 
objects.
+                removeServerMBean(connectorRef.server);
+                connectorRef.server.stop();
+                //removeServerMBean(connectorRef.connector);
+                CONNECTORS.remove(connectorEntry.getKey());
             }
         }
         if (mbContainer != null) {
diff --git 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
index 9d2dab5496f..aeb9797330d 100644
--- 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
+++ 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
@@ -137,43 +137,53 @@ public class JiraEndpoint extends ScheduledPollEndpoint 
implements EndpointServi
         disconnect();
     }
 
-    public synchronized void connect() {
-        if (client == null) {
-            Registry registry = getCamelContext().getRegistry();
-            JiraRestClientFactory factory
-                    = registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY, 
JiraRestClientFactory.class);
-            if (factory == null) {
-                factory = new OAuthAsynchronousJiraRestClientFactory();
-            }
-            final URI jiraServerUri = URI.create(configuration.getJiraUrl());
-            if (configuration.getUsername() != null) {
-                LOG.debug("Connecting to JIRA with Basic authentication with 
username/password");
-                client = 
factory.createWithBasicHttpAuthentication(jiraServerUri, 
configuration.getUsername(),
-                        configuration.getPassword());
-            } else if (configuration.getAccessToken() != null
-                    && configuration.getVerificationCode() == null
-                    && configuration.getPrivateKey() == null
-                    && configuration.getConsumerKey() == null) {
-                client = factory.create(jiraServerUri, builder -> {
-                    builder.setHeader("Authorization", "Bearer " + 
configuration.getAccessToken());
-                });
-            } else {
-                LOG.debug("Connecting to JIRA with OAuth authentication");
-                JiraOAuthAuthenticationHandler oAuthHandler = new 
JiraOAuthAuthenticationHandler(
-                        configuration.getConsumerKey(),
-                        configuration.getVerificationCode(), 
configuration.getPrivateKey(),
-                        configuration.getAccessToken(),
-                        configuration.getJiraUrl());
-                client = factory.create(jiraServerUri, oAuthHandler);
+    public void connect() {
+        lock.lock();
+        try {
+            if (client == null) {
+                Registry registry = getCamelContext().getRegistry();
+                JiraRestClientFactory factory
+                        = 
registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY, 
JiraRestClientFactory.class);
+                if (factory == null) {
+                    factory = new OAuthAsynchronousJiraRestClientFactory();
+                }
+                final URI jiraServerUri = 
URI.create(configuration.getJiraUrl());
+                if (configuration.getUsername() != null) {
+                    LOG.debug("Connecting to JIRA with Basic authentication 
with username/password");
+                    client = 
factory.createWithBasicHttpAuthentication(jiraServerUri, 
configuration.getUsername(),
+                            configuration.getPassword());
+                } else if (configuration.getAccessToken() != null
+                        && configuration.getVerificationCode() == null
+                        && configuration.getPrivateKey() == null
+                        && configuration.getConsumerKey() == null) {
+                    client = factory.create(jiraServerUri, builder -> {
+                        builder.setHeader("Authorization", "Bearer " + 
configuration.getAccessToken());
+                    });
+                } else {
+                    LOG.debug("Connecting to JIRA with OAuth authentication");
+                    JiraOAuthAuthenticationHandler oAuthHandler = new 
JiraOAuthAuthenticationHandler(
+                            configuration.getConsumerKey(),
+                            configuration.getVerificationCode(), 
configuration.getPrivateKey(),
+                            configuration.getAccessToken(),
+                            configuration.getJiraUrl());
+                    client = factory.create(jiraServerUri, oAuthHandler);
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void disconnect() throws Exception {
-        if (client != null) {
-            LOG.debug("Disconnecting from JIRA");
-            client.close();
-            client = null;
+    public void disconnect() throws Exception {
+        lock.lock();
+        try {
+            if (client != null) {
+                LOG.debug("Disconnecting from JIRA");
+                client.close();
+                client = null;
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index a10d5c7814e..7fd77b11aea 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import jakarta.jms.Destination;
 import jakarta.jms.JMSException;
 import jakarta.jms.Message;
@@ -45,6 +48,7 @@ import static 
org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException;
  */
 public class EndpointMessageListener implements SessionAwareMessageListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(EndpointMessageListener.class);
+    private final Lock lock = new ReentrantLock();
     private final JmsConsumer consumer;
     private final JmsEndpoint endpoint;
     private final AsyncProcessor processor;
@@ -315,11 +319,16 @@ public class EndpointMessageListener implements 
SessionAwareMessageListener {
         this.eagerPoisonBody = eagerPoisonBody;
     }
 
-    public synchronized JmsOperations getTemplate() {
-        if (template == null) {
-            template = endpoint.createInOnlyTemplate();
+    public JmsOperations getTemplate() {
+        lock.lock();
+        try {
+            if (template == null) {
+                template = endpoint.createInOnlyTemplate();
+            }
+            return template;
+        } finally {
+            lock.unlock();
         }
-        return template;
     }
 
     public void setTemplate(JmsOperations template) {
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index e6ac7d7a008..31a718d09d3 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -1157,14 +1157,19 @@ public class JmsComponent extends 
HeaderFilterStrategyComponent {
         super.doShutdown();
     }
 
-    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
-        if (asyncStartStopExecutorService == null) {
-            // use a cached thread pool for async start tasks as they can run 
for a while, and we need a dedicated thread
-            // for each task, and the thread pool will shrink when no more 
tasks running
-            asyncStartStopExecutorService
-                    = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"AsyncStartStopListener");
+    protected ExecutorService getAsyncStartStopExecutorService() {
+        lock.lock();
+        try {
+            if (asyncStartStopExecutorService == null) {
+                // use a cached thread pool for async start tasks as they can 
run for a while, and we need a dedicated thread
+                // for each task, and the thread pool will shrink when no more 
tasks running
+                asyncStartStopExecutorService
+                        = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"AsyncStartStopListener");
+            }
+            return asyncStartStopExecutorService;
+        } finally {
+            lock.unlock();
         }
-        return asyncStartStopExecutorService;
     }
 
     @Override
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index f0b38161cbd..b486bb7b341 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -78,7 +78,8 @@ public class JmsProducer extends DefaultAsyncProducer {
 
     protected void initReplyManager() {
         if (!started.get()) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (started.get()) {
                     return;
                 }
@@ -119,6 +120,8 @@ public class JmsProducer extends DefaultAsyncProducer {
                     
Thread.currentThread().setContextClassLoader(oldClassLoader);
                 }
                 started.set(true);
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
index c8f80a0000d..d02c0b75f0a 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
@@ -57,11 +57,16 @@ public class JmsTemporaryQueueEndpoint extends 
JmsQueueEndpoint implements Desti
     }
 
     @Override
-    public synchronized Destination getJmsDestination(Session session) throws 
JMSException {
-        if (jmsDestination == null) {
-            jmsDestination = createJmsDestination(session);
+    public Destination getJmsDestination(Session session) throws JMSException {
+        lock.lock();
+        try {
+            if (jmsDestination == null) {
+                jmsDestination = createJmsDestination(session);
+            }
+            return jmsDestination;
+        } finally {
+            lock.unlock();
         }
-        return jmsDestination;
     }
 
     protected Destination createJmsDestination(Session session) throws 
JMSException {
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
index 2bd5cc30775..68a2b031c5d 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
@@ -50,11 +50,16 @@ public class JmsTemporaryTopicEndpoint extends JmsEndpoint 
implements Destinatio
     }
 
     @Override
-    public synchronized Destination getJmsDestination(Session session) throws 
JMSException {
-        if (jmsDestination == null) {
-            jmsDestination = createJmsDestination(session);
+    public Destination getJmsDestination(Session session) throws JMSException {
+        lock.lock();
+        try {
+            if (jmsDestination == null) {
+                jmsDestination = createJmsDestination(session);
+            }
+            return jmsDestination;
+        } finally {
+            lock.unlock();
         }
-        return jmsDestination;
     }
 
     protected Destination createJmsDestination(Session session) throws 
JMSException {
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
index 72295b24abc..400e96dd2db 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.jms;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import jakarta.jms.JMSException;
 import jakarta.jms.MessageEOFException;
@@ -25,6 +27,7 @@ import jakarta.jms.StreamMessage;
 
 public class StreamMessageInputStream extends InputStream {
 
+    private final Lock lock = new ReentrantLock();
     private final StreamMessage message;
     private volatile boolean eof;
 
@@ -74,11 +77,14 @@ public class StreamMessageInputStream extends InputStream {
     }
 
     @Override
-    public synchronized void reset() throws IOException {
+    public void reset() throws IOException {
+        lock.lock();
         try {
             message.reset();
         } catch (JMSException e) {
             throw new IOException(e);
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
index 9c7f93d2d1e..19c3c67341e 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.jms.reply;
 
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.TimeoutMap;
 import org.slf4j.Logger;
@@ -34,7 +36,7 @@ public class MessageSelectorCreator {
     protected final ConcurrentSkipListSet<String> correlationIds;
     protected volatile boolean dirty = true;
     protected StringBuilder expression;
-    private final Object lock = new Object();
+    private final Lock lock = new ReentrantLock();
 
     public MessageSelectorCreator(CorrelationTimeoutMap timeoutMap) {
         this.timeoutMap = timeoutMap;
@@ -46,7 +48,8 @@ public class MessageSelectorCreator {
     }
 
     public String get() {
-        synchronized (lock) {
+        lock.lock();
+        try {
             if (!dirty) {
                 return expression.toString();
             }
@@ -74,18 +77,23 @@ public class MessageSelectorCreator {
 
             dirty = false;
             return answer;
+        } finally {
+            lock.unlock();
         }
     }
 
     // Changes to live correlation-ids invalidate existing message selector
     private void timeoutEvent(TimeoutMap.Listener.Type type, String cid) {
-        synchronized (lock) {
+        lock.lock();
+        try {
             if (type == Put) {
                 correlationIds.add(cid);
             } else if (type == Remove || type == Evict) {
                 correlationIds.remove(cid);
             }
             dirty = true;
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index 9d95ff14643..8eb17d17506 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -109,12 +109,15 @@ public class QueueReplyManager extends 
ReplyManagerSupport {
                 Session session, String destinationName,
                 boolean pubSubDomain)
                 throws JMSException {
-            synchronized (QueueReplyManager.this) {
+            QueueReplyManager.this.lock.lock();
+            try {
                 // resolve the reply to destination
                 if (destination == null) {
                     destination = delegate.resolveDestinationName(session, 
destinationName, pubSubDomain);
                     setReplyTo(destination);
                 }
+            } finally {
+                QueueReplyManager.this.lock.unlock();
             }
             return destination;
         }
diff --git 
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
 
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
index 52e39bb957f..2b3ecb843e7 100644
--- 
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
+++ 
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.jmx;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import javax.management.AttributeChangeNotification;
 import javax.management.AttributeChangeNotificationFilter;
 import javax.management.Notification;
@@ -26,6 +29,7 @@ import javax.management.Notification;
  */
 public class JMXConsumerNotificationFilter extends 
AttributeChangeNotificationFilter {
 
+    private final Lock lock = new ReentrantLock();
     private final String stringToCompare;
     private final boolean notifyMatch;
 
@@ -36,25 +40,30 @@ public class JMXConsumerNotificationFilter extends 
AttributeChangeNotificationFi
     }
 
     @Override
-    public synchronized boolean isNotificationEnabled(Notification 
notification) {
-        boolean enabled = super.isNotificationEnabled(notification);
-        if (!enabled) {
-            return false;
-        }
+    public boolean isNotificationEnabled(Notification notification) {
+        lock.lock();
+        try {
+            boolean enabled = super.isNotificationEnabled(notification);
+            if (!enabled) {
+                return false;
+            }
 
-        boolean match = false;
-        if (stringToCompare != null) {
-            AttributeChangeNotification acn = (AttributeChangeNotification) 
notification;
-            Object newValue = acn.getNewValue();
-            // special for null
-            if ("null".equals(stringToCompare) && newValue == null) {
-                match = true;
-            } else if (newValue != null) {
-                match = stringToCompare.equals(newValue.toString());
+            boolean match = false;
+            if (stringToCompare != null) {
+                AttributeChangeNotification acn = 
(AttributeChangeNotification) notification;
+                Object newValue = acn.getNewValue();
+                // special for null
+                if ("null".equals(stringToCompare) && newValue == null) {
+                    match = true;
+                } else if (newValue != null) {
+                    match = stringToCompare.equals(newValue.toString());
+                }
+                return notifyMatch == match;
+            } else {
+                return true;
             }
-            return notifyMatch == match;
-        } else {
-            return true;
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
 
b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
index a9981fa385c..532b5959a7a 100644
--- 
a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
+++ 
b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
@@ -82,39 +82,44 @@ public class JoltEndpoint extends ResourceEndpoint {
         return "jolt:" + getResourceUri();
     }
 
-    private synchronized JoltTransform getTransform() throws Exception {
-        if (transform == null) {
-            if (log.isDebugEnabled()) {
-                String path = getResourceUri();
-                log.debug("Jolt content read from resource {} with 
resourceUri: {} for endpoint {}", getResourceUri(), path,
-                        getEndpointUri());
-            }
+    private JoltTransform getTransform() throws Exception {
+        getInternalLock().lock();
+        try {
+            if (transform == null) {
+                if (log.isDebugEnabled()) {
+                    String path = getResourceUri();
+                    log.debug("Jolt content read from resource {} with 
resourceUri: {} for endpoint {}", getResourceUri(), path,
+                            getEndpointUri());
+                }
 
-            // Sortr does not require a spec
-            if (this.transformDsl == JoltTransformType.Sortr) {
-                this.transform = new Sortr();
-            } else {
-                // getResourceAsInputStream also considers the content cache
-                Object spec = 
JsonUtils.jsonToObject(getResourceAsInputStream());
-                switch (this.transformDsl) {
-                    case Shiftr:
-                        this.transform = new Shiftr(spec);
-                        break;
-                    case Defaultr:
-                        this.transform = new Defaultr(spec);
-                        break;
-                    case Removr:
-                        this.transform = new Removr(spec);
-                        break;
-                    case Chainr:
-                    default:
-                        this.transform = Chainr.fromSpec(spec);
-                        break;
+                // Sortr does not require a spec
+                if (this.transformDsl == JoltTransformType.Sortr) {
+                    this.transform = new Sortr();
+                } else {
+                    // getResourceAsInputStream also considers the content 
cache
+                    Object spec = 
JsonUtils.jsonToObject(getResourceAsInputStream());
+                    switch (this.transformDsl) {
+                        case Shiftr:
+                            this.transform = new Shiftr(spec);
+                            break;
+                        case Defaultr:
+                            this.transform = new Defaultr(spec);
+                            break;
+                        case Removr:
+                            this.transform = new Removr(spec);
+                            break;
+                        case Chainr:
+                        default:
+                            this.transform = Chainr.fromSpec(spec);
+                            break;
+                    }
                 }
-            }
 
+            }
+            return transform;
+        } finally {
+            getInternalLock().unlock();
         }
-        return transform;
     }
 
     /**
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
index cc55c69c08d..284808c7129 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
@@ -121,13 +121,18 @@ public class JpaComponent extends HealthCheckComponent {
         return aliases;
     }
 
-    synchronized ExecutorService getOrCreatePollingConsumerExecutorService() {
-        if (pollingConsumerExecutorService == null) {
-            LOG.debug("Creating thread pool for JpaPollingConsumer to support 
polling using timeout");
-            pollingConsumerExecutorService
-                    = 
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, 
"JpaPollingConsumer");
+    ExecutorService getOrCreatePollingConsumerExecutorService() {
+        lock.lock();
+        try {
+            if (pollingConsumerExecutorService == null) {
+                LOG.debug("Creating thread pool for JpaPollingConsumer to 
support polling using timeout");
+                pollingConsumerExecutorService
+                        = 
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, 
"JpaPollingConsumer");
+            }
+            return pollingConsumerExecutorService;
+        } finally {
+            lock.unlock();
         }
-        return pollingConsumerExecutorService;
     }
 
     // Implementation methods
diff --git 
a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
 
b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
index d77208f267c..4e8025bf5c2 100644
--- 
a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
+++ 
b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
@@ -107,62 +107,68 @@ public class JsltEndpoint extends ResourceEndpoint {
         return "jslt:" + getResourceUri();
     }
 
-    private synchronized Expression getTransform(Message msg) throws Exception 
{
-        final String jsltStringFromHeader
-                = allowTemplateFromHeader ? 
msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null;
+    private Expression getTransform(Message msg) throws Exception {
+        getInternalLock().lock();
+        try {
 
-        final boolean useTemplateFromUri = jsltStringFromHeader == null;
+            final String jsltStringFromHeader
+                    = allowTemplateFromHeader ? 
msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null;
 
-        if (useTemplateFromUri && transform != null) {
-            return transform;
-        }
+            final boolean useTemplateFromUri = jsltStringFromHeader == null;
 
-        final Collection<Function> functions = Objects.requireNonNullElse(
-                ((JsltComponent) getComponent()).getFunctions(),
-                Collections.emptyList());
+            if (useTemplateFromUri && transform != null) {
+                return transform;
+            }
 
-        final JsonFilter objectFilter = Objects.requireNonNullElse(
-                ((JsltComponent) getComponent()).getObjectFilter(),
-                DEFAULT_JSON_FILTER);
+            final Collection<Function> functions = Objects.requireNonNullElse(
+                    ((JsltComponent) getComponent()).getFunctions(),
+                    Collections.emptyList());
 
-        final String transformSource;
-        final InputStream stream;
+            final JsonFilter objectFilter = Objects.requireNonNullElse(
+                    ((JsltComponent) getComponent()).getObjectFilter(),
+                    DEFAULT_JSON_FILTER);
 
-        if (useTemplateFromUri) {
-            transformSource = getResourceUri();
+            final String transformSource;
+            final InputStream stream;
+
+            if (useTemplateFromUri) {
+                transformSource = getResourceUri();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Jslt content read from resource {} with 
resourceUri: {} for endpoint {}",
+                            transformSource,
+                            transformSource,
+                            getEndpointUri());
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("Jslt content read from resource {} with 
resourceUri: {} for endpoint {}",
-                        transformSource,
-                        transformSource,
-                        getEndpointUri());
+                stream = 
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), 
transformSource);
+                if (stream == null) {
+                    throw new JsltException("Cannot load resource '" + 
transformSource + "': not found");
+                }
+            } else { // use template from header
+                stream = new 
ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8));
+                transformSource = "<inline>";
             }
 
-            stream = 
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), 
transformSource);
-            if (stream == null) {
-                throw new JsltException("Cannot load resource '" + 
transformSource + "': not found");
+            final Expression transform;
+            try {
+                transform = new Parser(new InputStreamReader(stream))
+                        .withFunctions(functions)
+                        .withObjectFilter(objectFilter)
+                        .withSource(transformSource)
+                        .compile();
+            } finally {
+                // the stream is consumed only on .compile(), cannot be closed 
before
+                IOHelper.close(stream);
             }
-        } else { // use template from header
-            stream = new 
ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8));
-            transformSource = "<inline>";
-        }
 
-        final Expression transform;
-        try {
-            transform = new Parser(new InputStreamReader(stream))
-                    .withFunctions(functions)
-                    .withObjectFilter(objectFilter)
-                    .withSource(transformSource)
-                    .compile();
+            if (useTemplateFromUri) {
+                this.transform = transform;
+            }
+            return transform;
         } finally {
-            // the stream is consumed only on .compile(), cannot be closed 
before
-            IOHelper.close(stream);
-        }
-
-        if (useTemplateFromUri) {
-            this.transform = transform;
+            getInternalLock().unlock();
         }
-        return transform;
     }
 
     public JsltEndpoint findOrCreateEndpoint(String uri, String 
newResourceUri) {
diff --git 
a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
 
b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
index 893d118a465..9c921cedefb 100644
--- 
a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
+++ 
b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
@@ -188,10 +188,13 @@ public class JsonValidatorEndpoint extends 
ResourceEndpoint {
      * @return The currently loaded schema
      */
     private JsonSchema getOrCreateSchema() throws Exception {
-        synchronized (this) {
+        getInternalLock().lock();
+        try {
             if (this.schema == null) {
                 this.schema = 
this.uriSchemaLoader.createSchema(getCamelContext(), getResourceUri());
             }
+        } finally {
+            getInternalLock().unlock();
         }
         return this.schema;
     }
diff --git 
a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
 
b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
index 24999b525a9..7b05810a99f 100644
--- 
a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
+++ 
b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
@@ -275,7 +275,7 @@ public class JsonStream extends FilterInputStream {
     }
 
     @Override
-    public synchronized void reset() throws IOException {
+    public void reset() throws IOException {
         throw new IOException("reset not supported");
     }
 
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
index 10e448b6a54..13ca2d83cbf 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
@@ -86,12 +86,17 @@ public class Jt400Component extends HealthCheckComponent {
      *
      * @return the default connection pool used by this component
      */
-    public synchronized AS400ConnectionPool getConnectionPool() {
-        if (connectionPool == null) {
-            LOG.info("Instantiating the default connection pool ...");
-            connectionPool = new AS400ConnectionPool();
+    public AS400ConnectionPool getConnectionPool() {
+        lock.lock();
+        try {
+            if (connectionPool == null) {
+                LOG.info("Instantiating the default connection pool ...");
+                connectionPool = new AS400ConnectionPool();
+            }
+            return connectionPool;
+        } finally {
+            lock.unlock();
         }
-        return connectionPool;
     }
 
     public void setConnectionPool(AS400ConnectionPool connectionPool) {
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
index d6c2114a6df..2f3b2d4db47 100755
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -101,43 +101,48 @@ public class Jt400MsgQueueConsumer extends 
ScheduledPollConsumer {
         }
     }
 
-    private synchronized Exchange receive(MessageQueue queue, long timeout) 
throws Exception {
-        QueuedMessage entry;
-        int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1;
-        LOG.trace("Reading from message queue: {} with {} seconds timeout", 
queue.getPath(),
-                -1 == seconds ? "infinite" : seconds);
-
-        Jt400Configuration.MessageAction messageAction = 
getEndpoint().getMessageAction();
-        entry = queue.receive(messageKey, //message key
-                seconds,    //timeout
-                messageAction.getJt400Value(),  // message action
-                null == messageKey ? MessageQueue.ANY : MessageQueue.NEXT); // 
types of messages
-
-        if (null == entry) {
-            return null;
-        }
-        // Need to tuck away the message key if the message action is SAME, 
otherwise
-        // we'll just keep retrieving the same message over and over
-        if (Jt400Configuration.MessageAction.SAME == messageAction) {
-            this.messageKey = entry.getKey();
-        }
+    private Exchange receive(MessageQueue queue, long timeout) throws 
Exception {
+        lock.lock();
+        try {
+            QueuedMessage entry;
+            int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1;
+            LOG.trace("Reading from message queue: {} with {} seconds 
timeout", queue.getPath(),
+                    -1 == seconds ? "infinite" : seconds);
+
+            Jt400Configuration.MessageAction messageAction = 
getEndpoint().getMessageAction();
+            entry = queue.receive(messageKey, //message key
+                    seconds,    //timeout
+                    messageAction.getJt400Value(),  // message action
+                    null == messageKey ? MessageQueue.ANY : 
MessageQueue.NEXT); // types of messages
+
+            if (null == entry) {
+                return null;
+            }
+            // Need to tuck away the message key if the message action is 
SAME, otherwise
+            // we'll just keep retrieving the same message over and over
+            if (Jt400Configuration.MessageAction.SAME == messageAction) {
+                this.messageKey = entry.getKey();
+            }
 
-        Exchange exchange = createExchange(true);
-        exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
-                entry.getFromJobNumber() + "/" + entry.getUser() + "/" + 
entry.getFromJobName());
-        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, 
entry.getID());
-        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE, 
entry.getFileName());
-        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE, 
entry.getType());
-        setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity());
-        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE, 
entry);
-        if (AS400Message.INQUIRY == entry.getType()) {
-            setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply());
-            if (getEndpoint().isSendingReply()) {
-                setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey());
+            Exchange exchange = createExchange(true);
+            exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
+                    entry.getFromJobNumber() + "/" + entry.getUser() + "/" + 
entry.getFromJobName());
+            setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_ID, entry.getID());
+            setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_FILE, entry.getFileName());
+            setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_TYPE, entry.getType());
+            setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity());
+            setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE, 
entry);
+            if (AS400Message.INQUIRY == entry.getType()) {
+                setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply());
+                if (getEndpoint().isSendingReply()) {
+                    setHeaderIfValueNotNull(exchange.getIn(), 
Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey());
+                }
             }
+            exchange.getIn().setBody(entry.getText());
+            return exchange;
+        } finally {
+            lock.unlock();
         }
-        exchange.getIn().setBody(entry.getText());
-        return exchange;
     }
 
     private static void setHeaderIfValueNotNull(final Message message, final 
String header, final Object value) {
diff --git 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
index db1b75288e1..321d4fbf873 100644
--- 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
+++ 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
@@ -161,29 +161,34 @@ public class JtaTransactionErrorHandlerReifier extends 
ErrorHandlerReifier<JtaTr
         return answer;
     }
 
-    protected synchronized ScheduledExecutorService getExecutorService(
+    protected ScheduledExecutorService getExecutorService(
             ScheduledExecutorService executorService, String 
executorServiceRef) {
-        if (executorService == null || executorService.isShutdown()) {
-            // camel context will shutdown the executor when it shutdown so no
-            // need to shut it down when stopping
-            if (executorServiceRef != null) {
-                executorService = lookupByNameAndType(executorServiceRef, 
ScheduledExecutorService.class);
-                if (executorService == null) {
-                    ExecutorServiceManager manager = 
camelContext.getExecutorServiceManager();
-                    ThreadPoolProfile profile = 
manager.getThreadPoolProfile(executorServiceRef);
-                    executorService = manager.newScheduledThreadPool(this, 
executorServiceRef, profile);
+        lock.lock();
+        try {
+            if (executorService == null || executorService.isShutdown()) {
+                // camel context will shutdown the executor when it shutdown 
so no
+                // need to shut it down when stopping
+                if (executorServiceRef != null) {
+                    executorService = lookupByNameAndType(executorServiceRef, 
ScheduledExecutorService.class);
+                    if (executorService == null) {
+                        ExecutorServiceManager manager = 
camelContext.getExecutorServiceManager();
+                        ThreadPoolProfile profile = 
manager.getThreadPoolProfile(executorServiceRef);
+                        executorService = manager.newScheduledThreadPool(this, 
executorServiceRef, profile);
+                    }
+                    if (executorService == null) {
+                        throw new IllegalArgumentException("ExecutorService " 
+ executorServiceRef + " not found in registry.");
+                    }
+                } else {
+                    // no explicit configured thread pool, so leave it up to 
the
+                    // error handler to decide if it need a default thread 
pool from
+                    // CamelContext#getErrorHandlerExecutorService
+                    executorService = null;
                 }
-                if (executorService == null) {
-                    throw new IllegalArgumentException("ExecutorService " + 
executorServiceRef + " not found in registry.");
-                }
-            } else {
-                // no explicit configured thread pool, so leave it up to the
-                // error handler to decide if it need a default thread pool 
from
-                // CamelContext#getErrorHandlerExecutorService
-                executorService = null;
             }
+            return executorService;
+        } finally {
+            lock.unlock();
         }
-        return executorService;
     }
 
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 3534fccf3f4..f30af79ef82 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -651,7 +651,7 @@ public class KafkaFetchRecords implements Runnable {
         state = State.RESUME_REQUESTED;
     }
 
-    private synchronized void setLastError(Exception lastError) {
+    private void setLastError(Exception lastError) {
         this.lastError = lastError;
     }
 
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 5e3eb6cc9da..e7d493e0d1a 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -23,7 +23,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -63,6 +67,8 @@ public class KameletComponent extends DefaultComponent {
 
     // active consumers
     private final Map<String, KameletConsumer> consumers = new HashMap<>();
+    private final Lock consumersLock = new ReentrantLock();
+    private final Condition consumersCondition = consumersLock.newCondition();
     // active kamelet EIPs
     private final Map<String, Processor> kameletEips = new 
ConcurrentHashMap<>();
 
@@ -344,7 +350,8 @@ public class KameletComponent extends DefaultComponent {
     }
 
     public void addConsumer(String key, KameletConsumer consumer) {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             if (consumers.putIfAbsent(key, consumer) != null) {
                 throw new IllegalArgumentException(
                         "Cannot add a 2nd consumer to the same endpoint: " + 
key
@@ -352,21 +359,27 @@ public class KameletComponent extends DefaultComponent {
             }
             // state changed so inc counter
             stateCounter++;
-            consumers.notifyAll();
+            consumersCondition.signalAll();
+        } finally {
+            consumersLock.unlock();
         }
     }
 
     public void removeConsumer(String key, KameletConsumer consumer) {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             consumers.remove(key, consumer);
             // state changed so inc counter
             stateCounter++;
-            consumers.notifyAll();
+            consumersCondition.signalAll();
+        } finally {
+            consumersLock.unlock();
         }
     }
 
     protected KameletConsumer getConsumer(String key, boolean block, long 
timeout) throws InterruptedException {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             KameletConsumer answer = consumers.get(key);
             if (answer == null && block) {
                 StopWatch watch = new StopWatch();
@@ -379,10 +392,12 @@ public class KameletComponent extends DefaultComponent {
                     if (rem <= 0) {
                         break;
                     }
-                    consumers.wait(rem);
+                    consumersCondition.await(rem, TimeUnit.MILLISECONDS);
                 }
             }
             return answer;
+        } finally {
+            consumersLock.unlock();
         }
     }
 
diff --git 
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
 
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 2b68cd98efb..7a96ef3e3de 100644
--- 
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ 
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -132,11 +132,16 @@ public class KnativeComponent extends 
HealthCheckComponent {
         return producerFactory;
     }
 
-    public synchronized KnativeProducerFactory getOrCreateProducerFactory() 
throws Exception {
-        if (producerFactory == null) {
-            producerFactory = setUpProducerFactory();
+    public KnativeProducerFactory getOrCreateProducerFactory() throws 
Exception {
+        lock.lock();
+        try {
+            if (producerFactory == null) {
+                producerFactory = setUpProducerFactory();
+            }
+            return producerFactory;
+        } finally {
+            lock.unlock();
         }
-        return producerFactory;
     }
 
     /**
@@ -150,11 +155,16 @@ public class KnativeComponent extends 
HealthCheckComponent {
         return consumerFactory;
     }
 
-    public synchronized KnativeConsumerFactory getOrCreateConsumerFactory() 
throws Exception {
-        if (consumerFactory == null) {
-            consumerFactory = setUpConsumerFactory();
+    public KnativeConsumerFactory getOrCreateConsumerFactory() throws 
Exception {
+        lock.lock();
+        try {
+            if (consumerFactory == null) {
+                consumerFactory = setUpConsumerFactory();
+            }
+            return consumerFactory;
+        } finally {
+            lock.unlock();
         }
-        return consumerFactory;
     }
 
     /**
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 61b2a763001..ded0576fe2b 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -126,69 +126,79 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
     // Helpers
     // **************************************
 
-    private synchronized void onLeadershipTaken() throws Exception {
-        if (!isRunAllowed()) {
-            return;
-        }
+    private void onLeadershipTaken() throws Exception {
+        lock.lock();
+        try {
+            if (!isRunAllowed()) {
+                return;
+            }
 
-        if (delegatedConsumer != null) {
-            return;
-        }
+            if (delegatedConsumer != null) {
+                return;
+            }
 
-        // start consumer using background task up till X attempts
-        long delay = masterEndpoint.getComponent().getBackOffDelay();
-        long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
+            // start consumer using background task up till X attempts
+            long delay = masterEndpoint.getComponent().getBackOffDelay();
+            long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
 
-        BackOffTimer timer = new 
BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool());
-        
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> 
{
-            LOG.info("Leadership taken. Attempt #{} to start consumer: {}", 
task.getCurrentAttempts(),
-                    delegatedEndpoint);
+            BackOffTimer timer = new 
BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool());
+            
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> 
{
+                LOG.info("Leadership taken. Attempt #{} to start consumer: 
{}", task.getCurrentAttempts(),
+                        delegatedEndpoint);
 
-            Exception cause = null;
-            try {
-                if (delegatedConsumer == null) {
-                    delegatedConsumer = 
delegatedEndpoint.createConsumer(processor);
-                    if (delegatedConsumer instanceof StartupListener) {
-                        
getEndpoint().getCamelContext().addStartupListener((StartupListener) 
delegatedConsumer);
-                    }
-                    if (delegatedConsumer instanceof ResumeAware 
resumeAwareConsumer && resumeStrategy != null) {
-                        LOG.debug("Setting up the resume adapter for the 
resume strategy in consumer");
-                        ResumeAdapter resumeAdapter
-                                = 
AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer,
-                                        resumeStrategy);
-                        resumeStrategy.setAdapter(resumeAdapter);
-
-                        LOG.debug("Setting up the resume strategy for 
consumer");
-                        resumeAwareConsumer.setResumeStrategy(resumeStrategy);
+                Exception cause = null;
+                try {
+                    if (delegatedConsumer == null) {
+                        delegatedConsumer = 
delegatedEndpoint.createConsumer(processor);
+                        if (delegatedConsumer instanceof StartupListener) {
+                            
getEndpoint().getCamelContext().addStartupListener((StartupListener) 
delegatedConsumer);
+                        }
+                        if (delegatedConsumer instanceof ResumeAware 
resumeAwareConsumer && resumeStrategy != null) {
+                            LOG.debug("Setting up the resume adapter for the 
resume strategy in consumer");
+                            ResumeAdapter resumeAdapter
+                                    = 
AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer,
+                                            resumeStrategy);
+                            resumeStrategy.setAdapter(resumeAdapter);
+
+                            LOG.debug("Setting up the resume strategy for 
consumer");
+                            
resumeAwareConsumer.setResumeStrategy(resumeStrategy);
+                        }
                     }
-                }
-                ServiceHelper.startService(delegatedEndpoint, 
delegatedConsumer);
+                    ServiceHelper.startService(delegatedEndpoint, 
delegatedConsumer);
 
-            } catch (Exception e) {
-                cause = e;
-            }
+                } catch (Exception e) {
+                    cause = e;
+                }
 
-            if (cause != null) {
-                String message = "Leadership taken. Attempt #" + 
task.getCurrentAttempts()
-                                 + " failed to start consumer due to: " + 
cause.getMessage();
-                getExceptionHandler().handleException(message, cause);
-                return true; // retry
-            }
+                if (cause != null) {
+                    String message = "Leadership taken. Attempt #" + 
task.getCurrentAttempts()
+                                     + " failed to start consumer due to: " + 
cause.getMessage();
+                    getExceptionHandler().handleException(message, cause);
+                    return true; // retry
+                }
 
-            LOG.info("Leadership taken. Attempt #" + task.getCurrentAttempts() 
+ " success. Consumer started: {}",
-                    delegatedEndpoint);
-            return false; // no more attempts
-        });
+                LOG.info("Leadership taken. Attempt #" + 
task.getCurrentAttempts() + " success. Consumer started: {}",
+                        delegatedEndpoint);
+                return false; // no more attempts
+            });
+        } finally {
+            lock.unlock();
+        }
     }
 
-    private synchronized void onLeadershipLost() {
-        LOG.debug("Leadership lost. Stopping consumer: {}", delegatedEndpoint);
+    private void onLeadershipLost() {
+        lock.lock();
         try {
-            ServiceHelper.stopAndShutdownServices(delegatedConsumer, 
delegatedEndpoint);
+            LOG.debug("Leadership lost. Stopping consumer: {}", 
delegatedEndpoint);
+            try {
+                ServiceHelper.stopAndShutdownServices(delegatedConsumer, 
delegatedEndpoint);
+            } finally {
+                delegatedConsumer = null;
+            }
+            LOG.info("Leadership lost. Consumer stopped: {}", 
delegatedEndpoint);
         } finally {
-            delegatedConsumer = null;
+            lock.unlock();
         }
-        LOG.info("Leadership lost. Consumer stopped: {}", delegatedEndpoint);
     }
 
     // **************************************
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 802d7b8914b..cf377835847 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -131,216 +131,226 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
     }
 
     @Override
-    public synchronized void process(Exchange exchange) throws MllpException {
-        log.trace("process({}) [{}] - entering", exchange.getExchangeId(), 
socket);
-        getEndpoint().updateLastConnectionActivityTicks();
-
-        Message message = exchange.getMessage();
-
-        getEndpoint().checkBeforeSendProperties(exchange, socket, log);
-
-        // Establish a connection if needed
+    public void process(Exchange exchange) throws MllpException {
+        lock.lock();
         try {
-            checkConnection();
+            log.trace("process({}) [{}] - entering", exchange.getExchangeId(), 
socket);
+            getEndpoint().updateLastConnectionActivityTicks();
 
-            if (cachedLocalAddress != null) {
-                message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, 
cachedLocalAddress);
-            }
+            Message message = exchange.getMessage();
 
-            if (cachedRemoteAddress != null) {
-                message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, 
cachedRemoteAddress);
-            }
+            getEndpoint().checkBeforeSendProperties(exchange, socket, log);
 
-            // Send the message to the external system
-            byte[] hl7MessageBytes = null;
-            Object messageBody = message.getBody();
-            if (messageBody == null) {
-                String exceptionMessage
-                        = String.format("process(%s) [%s] - message body is 
null", exchange.getExchangeId(), socket);
-                exchange.setException(new 
MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
-                return;
-            } else if (messageBody instanceof byte[]) {
-                hl7MessageBytes = (byte[]) messageBody;
-            } else if (messageBody instanceof String) {
-                String stringBody = (String) messageBody;
-                hl7MessageBytes = 
stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
-                if (getConfiguration().hasCharsetName()) {
-                    exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, 
getConfiguration().getCharsetName());
+            // Establish a connection if needed
+            try {
+                checkConnection();
+
+                if (cachedLocalAddress != null) {
+                    message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, 
cachedLocalAddress);
                 }
-            }
 
-            log.debug("process({}) [{}] - sending message to external system", 
exchange.getExchangeId(), socket);
+                if (cachedRemoteAddress != null) {
+                    message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, 
cachedRemoteAddress);
+                }
 
-            try {
-                mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
-                mllpBuffer.writeTo(socket);
-            } catch (MllpSocketException writeEx) {
-                // Connection may have been reset - try one more time
-                log.debug("process({}) [{}] - exception encountered writing 
payload - attempting reconnect",
-                        exchange.getExchangeId(), socket, writeEx);
-                try {
-                    checkConnection();
-                    log.trace("process({}) [{}] - reconnected succeeded - 
resending payload", exchange.getExchangeId(), socket);
-                    try {
-                        mllpBuffer.writeTo(socket);
-                    } catch (MllpSocketException retryWriteEx) {
-                        String exceptionMessage = String.format(
-                                "process(%s) [%s] - exception encountered 
attempting to write payload after reconnect",
-                                exchange.getExchangeId(), socket);
-                        log.warn(exceptionMessage, retryWriteEx);
-                        exchange.setException(
-                                new MllpWriteException(
-                                        exceptionMessage, 
mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
+                // Send the message to the external system
+                byte[] hl7MessageBytes = null;
+                Object messageBody = message.getBody();
+                if (messageBody == null) {
+                    String exceptionMessage
+                            = String.format("process(%s) [%s] - message body 
is null", exchange.getExchangeId(), socket);
+                    exchange.setException(new 
MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
+                    return;
+                } else if (messageBody instanceof byte[]) {
+                    hl7MessageBytes = (byte[]) messageBody;
+                } else if (messageBody instanceof String) {
+                    String stringBody = (String) messageBody;
+                    hl7MessageBytes = 
stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
+                    if (getConfiguration().hasCharsetName()) {
+                        exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, 
getConfiguration().getCharsetName());
                     }
-                } catch (IOException reconnectEx) {
-                    String exceptionMessage = String.format("process(%s) [%s] 
- exception encountered attempting to reconnect",
-                            exchange.getExchangeId(), socket);
-                    log.warn(exceptionMessage, reconnectEx);
-                    exchange.setException(
-                            new MllpWriteException(exceptionMessage, 
mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
-                    mllpBuffer.resetSocket(socket);
                 }
-            }
-            if (getConfiguration().getExchangePattern() == 
ExchangePattern.InOnly) {
-                log.debug("process({}) [{}] - not checking acknowledgement 
from external system",
-                        exchange.getExchangeId(), socket);
-                return;
-            }
-            if (exchange.getException() == null) {
-                log.debug("process({}) [{}] - reading acknowledgement from 
external system", exchange.getExchangeId(), socket);
+
+                log.debug("process({}) [{}] - sending message to external 
system", exchange.getExchangeId(), socket);
+
                 try {
-                    mllpBuffer.reset();
-                    mllpBuffer.readFrom(socket);
-                } catch (MllpSocketException receiveAckEx) {
+                    mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
+                    mllpBuffer.writeTo(socket);
+                } catch (MllpSocketException writeEx) {
                     // Connection may have been reset - try one more time
-                    log.debug("process({}) [{}] - exception encountered 
reading acknowledgement - attempting reconnect",
-                            exchange.getExchangeId(), socket, receiveAckEx);
+                    log.debug("process({}) [{}] - exception encountered 
writing payload - attempting reconnect",
+                            exchange.getExchangeId(), socket, writeEx);
                     try {
                         checkConnection();
+                        log.trace("process({}) [{}] - reconnected succeeded - 
resending payload", exchange.getExchangeId(),
+                                socket);
+                        try {
+                            mllpBuffer.writeTo(socket);
+                        } catch (MllpSocketException retryWriteEx) {
+                            String exceptionMessage = String.format(
+                                    "process(%s) [%s] - exception encountered 
attempting to write payload after reconnect",
+                                    exchange.getExchangeId(), socket);
+                            log.warn(exceptionMessage, retryWriteEx);
+                            exchange.setException(
+                                    new MllpWriteException(
+                                            exceptionMessage, 
mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
+                        }
                     } catch (IOException reconnectEx) {
-                        String exceptionMessage = String.format(
-                                "process(%s) [%s] - exception encountered 
attempting to reconnect after acknowledgement read failure",
-                                exchange.getExchangeId(), socket);
+                        String exceptionMessage
+                                = String.format("process(%s) [%s] - exception 
encountered attempting to reconnect",
+                                        exchange.getExchangeId(), socket);
                         log.warn(exceptionMessage, reconnectEx);
                         exchange.setException(
-                                new MllpAcknowledgementReceiveException(
-                                        exceptionMessage, hl7MessageBytes, 
receiveAckEx, logPhi));
+                                new MllpWriteException(exceptionMessage, 
mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
                         mllpBuffer.resetSocket(socket);
                     }
-
-                    if (exchange.getException() == null) {
-                        log.trace("process({}) [{}] - resending payload after 
successful reconnect", exchange.getExchangeId(),
-                                socket);
+                }
+                if (getConfiguration().getExchangePattern() == 
ExchangePattern.InOnly) {
+                    log.debug("process({}) [{}] - not checking acknowledgement 
from external system",
+                            exchange.getExchangeId(), socket);
+                    return;
+                }
+                if (exchange.getException() == null) {
+                    log.debug("process({}) [{}] - reading acknowledgement from 
external system", exchange.getExchangeId(),
+                            socket);
+                    try {
+                        mllpBuffer.reset();
+                        mllpBuffer.readFrom(socket);
+                    } catch (MllpSocketException receiveAckEx) {
+                        // Connection may have been reset - try one more time
+                        log.debug("process({}) [{}] - exception encountered 
reading acknowledgement - attempting reconnect",
+                                exchange.getExchangeId(), socket, 
receiveAckEx);
                         try {
-                            mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
-                            mllpBuffer.writeTo(socket);
-                        } catch (MllpSocketException writeRetryEx) {
+                            checkConnection();
+                        } catch (IOException reconnectEx) {
                             String exceptionMessage = String.format(
-                                    "process(%s) [%s] - exception encountered 
attempting to write payload after read failure and successful reconnect",
+                                    "process(%s) [%s] - exception encountered 
attempting to reconnect after acknowledgement read failure",
                                     exchange.getExchangeId(), socket);
-                            log.warn(exceptionMessage, writeRetryEx);
+                            log.warn(exceptionMessage, reconnectEx);
                             exchange.setException(
-                                    new MllpWriteException(exceptionMessage, 
hl7MessageBytes, receiveAckEx, logPhi));
+                                    new MllpAcknowledgementReceiveException(
+                                            exceptionMessage, hl7MessageBytes, 
receiveAckEx, logPhi));
+                            mllpBuffer.resetSocket(socket);
                         }
 
                         if (exchange.getException() == null) {
-                            log.trace("process({}) [{}] - resend succeeded - 
reading acknowledgement from external system",
-                                    exchange.getExchangeId(), socket);
+                            log.trace("process({}) [{}] - resending payload 
after successful reconnect",
+                                    exchange.getExchangeId(),
+                                    socket);
                             try {
-                                mllpBuffer.reset();
-                                mllpBuffer.readFrom(socket);
-                            } catch (MllpSocketException secondReceiveEx) {
-                                String exceptionMessageFormat = 
mllpBuffer.isEmpty()
-                                        ? "process(%s) [%s] - exception 
encountered reading MLLP Acknowledgement after successful reconnect and resend"
-                                        : "process(%s) [%s] - exception 
encountered reading complete MLLP Acknowledgement after successful reconnect 
and resend";
-                                String exceptionMessage
-                                        = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
-                                log.warn(exceptionMessage, secondReceiveEx);
-                                // Send the original exception to the exchange
-                                exchange.setException(new 
MllpAcknowledgementReceiveException(
-                                        exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
-                                        logPhi));
-                            } catch (SocketTimeoutException 
secondReadTimeoutEx) {
-                                String exceptionMessageFormat = 
mllpBuffer.isEmpty()
-                                        ? "process(%s) [%s] - timeout 
receiving MLLP Acknowledgment after successful reconnect and resend"
-                                        : "process(%s) [%s] - timeout 
receiving complete MLLP Acknowledgment after successful reconnect and resend";
-                                String exceptionMessage
-                                        = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
-                                log.warn(exceptionMessage, 
secondReadTimeoutEx);
-                                // Send the original exception to the exchange
-                                exchange.setException(new 
MllpAcknowledgementTimeoutException(
-                                        exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
-                                        logPhi));
-                                mllpBuffer.resetSocket(socket);
+                                
mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
+                                mllpBuffer.writeTo(socket);
+                            } catch (MllpSocketException writeRetryEx) {
+                                String exceptionMessage = String.format(
+                                        "process(%s) [%s] - exception 
encountered attempting to write payload after read failure and successful 
reconnect",
+                                        exchange.getExchangeId(), socket);
+                                log.warn(exceptionMessage, writeRetryEx);
+                                exchange.setException(
+                                        new 
MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
+                            }
+
+                            if (exchange.getException() == null) {
+                                log.trace("process({}) [{}] - resend succeeded 
- reading acknowledgement from external system",
+                                        exchange.getExchangeId(), socket);
+                                try {
+                                    mllpBuffer.reset();
+                                    mllpBuffer.readFrom(socket);
+                                } catch (MllpSocketException secondReceiveEx) {
+                                    String exceptionMessageFormat = 
mllpBuffer.isEmpty()
+                                            ? "process(%s) [%s] - exception 
encountered reading MLLP Acknowledgement after successful reconnect and resend"
+                                            : "process(%s) [%s] - exception 
encountered reading complete MLLP Acknowledgement after successful reconnect 
and resend";
+                                    String exceptionMessage
+                                            = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                                    log.warn(exceptionMessage, 
secondReceiveEx);
+                                    // Send the original exception to the 
exchange
+                                    exchange.setException(new 
MllpAcknowledgementReceiveException(
+                                            exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
+                                            logPhi));
+                                } catch (SocketTimeoutException 
secondReadTimeoutEx) {
+                                    String exceptionMessageFormat = 
mllpBuffer.isEmpty()
+                                            ? "process(%s) [%s] - timeout 
receiving MLLP Acknowledgment after successful reconnect and resend"
+                                            : "process(%s) [%s] - timeout 
receiving complete MLLP Acknowledgment after successful reconnect and resend";
+                                    String exceptionMessage
+                                            = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                                    log.warn(exceptionMessage, 
secondReadTimeoutEx);
+                                    // Send the original exception to the 
exchange
+                                    exchange.setException(new 
MllpAcknowledgementTimeoutException(
+                                            exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
+                                            logPhi));
+                                    mllpBuffer.resetSocket(socket);
+                                }
                             }
                         }
+                    } catch (SocketTimeoutException timeoutEx) {
+                        String exceptionMessageFormat = mllpBuffer.isEmpty()
+                                ? "process(%s) [%s] - timeout receiving MLLP 
Acknowledgment"
+                                : "process(%s) [%s] - timeout receiving 
complete MLLP Acknowledgment";
+                        String exceptionMessage = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                        log.warn(exceptionMessage, timeoutEx);
+                        exchange.setException(new 
MllpAcknowledgementTimeoutException(
+                                exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
+                        mllpBuffer.resetSocket(socket);
                     }
-                } catch (SocketTimeoutException timeoutEx) {
-                    String exceptionMessageFormat = mllpBuffer.isEmpty()
-                            ? "process(%s) [%s] - timeout receiving MLLP 
Acknowledgment"
-                            : "process(%s) [%s] - timeout receiving complete 
MLLP Acknowledgment";
-                    String exceptionMessage = 
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
-                    log.warn(exceptionMessage, timeoutEx);
-                    exchange.setException(new 
MllpAcknowledgementTimeoutException(
-                            exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
-                    mllpBuffer.resetSocket(socket);
-                }
 
-                if (exchange.getException() == null) {
-                    if (mllpBuffer.hasCompleteEnvelope()) {
-                        byte[] acknowledgementBytes = 
mllpBuffer.toMllpPayload();
-
-                        log.debug(
-                                "process({}) [{}] - populating message headers 
with the acknowledgement from the external system",
-                                exchange.getExchangeId(), socket);
-                        message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, 
acknowledgementBytes);
-                        if (acknowledgementBytes != null && 
acknowledgementBytes.length > 0) {
-                            
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
-                                    acknowledgementBytes,
-                                    MllpCharsetHelper.getCharset(exchange, 
acknowledgementBytes, hl7Util, charset)));
-                        } else {
-                            
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
-                        }
+                    if (exchange.getException() == null) {
+                        if (mllpBuffer.hasCompleteEnvelope()) {
+                            byte[] acknowledgementBytes = 
mllpBuffer.toMllpPayload();
 
-                        if (getConfiguration().isValidatePayload()) {
-                            String exceptionMessage = 
hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
-                            if (exceptionMessage != null) {
-                                exchange.setException(new 
MllpInvalidAcknowledgementException(
-                                        exceptionMessage, hl7MessageBytes, 
acknowledgementBytes, logPhi));
+                            log.debug(
+                                    "process({}) [{}] - populating message 
headers with the acknowledgement from the external system",
+                                    exchange.getExchangeId(), socket);
+                            
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+                            if (acknowledgementBytes != null && 
acknowledgementBytes.length > 0) {
+                                
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
+                                        acknowledgementBytes,
+                                        MllpCharsetHelper.getCharset(exchange, 
acknowledgementBytes, hl7Util, charset)));
+                            } else {
+                                
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
                             }
-                        }
 
-                        if (exchange.getException() == null) {
-                            log.debug("process({}) [{}] - processing the 
acknowledgement from the external system",
-                                    exchange.getExchangeId(), socket);
-                            try {
-                                
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
-                                        processAcknowledgment(hl7MessageBytes, 
acknowledgementBytes));
-                            } catch (MllpNegativeAcknowledgementException 
nackEx) {
-                                
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, 
nackEx.getAcknowledgmentType());
-                                exchange.setException(nackEx);
+                            if (getConfiguration().isValidatePayload()) {
+                                String exceptionMessage = 
hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+                                if (exceptionMessage != null) {
+                                    exchange.setException(new 
MllpInvalidAcknowledgementException(
+                                            exceptionMessage, hl7MessageBytes, 
acknowledgementBytes, logPhi));
+                                }
                             }
 
-                            getEndpoint().checkAfterSendProperties(exchange, 
socket, log);
+                            if (exchange.getException() == null) {
+                                log.debug("process({}) [{}] - processing the 
acknowledgement from the external system",
+                                        exchange.getExchangeId(), socket);
+                                try {
+                                    
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
+                                            
processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
+                                } catch (MllpNegativeAcknowledgementException 
nackEx) {
+                                    
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, 
nackEx.getAcknowledgmentType());
+                                    exchange.setException(nackEx);
+                                }
+
+                                
getEndpoint().checkAfterSendProperties(exchange, socket, log);
+                            }
+                        } else {
+                            String exceptionMessage = 
String.format("process(%s) [%s] - invalid acknowledgement received",
+                                    exchange.getExchangeId(), socket);
+                            exchange.setException(new 
MllpInvalidAcknowledgementException(
+                                    exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), logPhi));
                         }
-                    } else {
-                        String exceptionMessage = String.format("process(%s) 
[%s] - invalid acknowledgement received",
-                                exchange.getExchangeId(), socket);
-                        exchange.setException(new 
MllpInvalidAcknowledgementException(
-                                exceptionMessage, hl7MessageBytes, 
mllpBuffer.toByteArrayAndReset(), logPhi));
                     }
                 }
+
+            } catch (IOException ioEx) {
+                log.debug("process({}) [{}] - IOException encountered checking 
connection", exchange.getExchangeId(), socket,
+                        ioEx);
+                exchange.setException(ioEx);
+                mllpBuffer.resetSocket(socket);
+            } finally {
+                mllpBuffer.reset();
             }
 
-        } catch (IOException ioEx) {
-            log.debug("process({}) [{}] - IOException encountered checking 
connection", exchange.getExchangeId(), socket, ioEx);
-            exchange.setException(ioEx);
-            mllpBuffer.resetSocket(socket);
+            log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), 
socket);
         } finally {
-            mllpBuffer.reset();
+            lock.unlock();
         }
-
-        log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), 
socket);
     }
 
     private String processAcknowledgment(byte[] hl7MessageBytes, byte[] 
hl7AcknowledgementBytes) throws MllpException {
@@ -519,44 +529,49 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
      * Check for idle connection
      */
     @Override
-    public synchronized void run() {
-        if (getConfiguration().hasIdleTimeout()) {
-            if (null != socket && !socket.isClosed() && socket.isConnected()) {
-                if (getEndpoint().hasLastConnectionActivityTicks()) {
-                    long idleTime = System.currentTimeMillis() - 
getEndpoint().getLastConnectionActivityTicks();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Checking {} for idle connection: {} - {}", 
getConnectionAddress(), idleTime,
-                                getConfiguration().getIdleTimeout());
-                    }
-                    if (idleTime >= getConfiguration().getIdleTimeout()) {
-                        if (MllpIdleTimeoutStrategy.CLOSE == 
getConfiguration().getIdleTimeoutStrategy()) {
-                            log.info(
-                                    "MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
closing connection",
-                                    idleTime, 
getConfiguration().getIdleTimeout());
-                            mllpBuffer.closeSocket(socket);
+    public void run() {
+        lock.lock();
+        try {
+            if (getConfiguration().hasIdleTimeout()) {
+                if (null != socket && !socket.isClosed() && 
socket.isConnected()) {
+                    if (getEndpoint().hasLastConnectionActivityTicks()) {
+                        long idleTime = System.currentTimeMillis() - 
getEndpoint().getLastConnectionActivityTicks();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Checking {} for idle connection: {} - 
{}", getConnectionAddress(), idleTime,
+                                    getConfiguration().getIdleTimeout());
+                        }
+                        if (idleTime >= getConfiguration().getIdleTimeout()) {
+                            if (MllpIdleTimeoutStrategy.CLOSE == 
getConfiguration().getIdleTimeoutStrategy()) {
+                                log.info(
+                                        "MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
closing connection",
+                                        idleTime, 
getConfiguration().getIdleTimeout());
+                                mllpBuffer.closeSocket(socket);
+                            } else {
+                                log.info(
+                                        "MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
resetting connection",
+                                        idleTime, 
getConfiguration().getIdleTimeout());
+                                mllpBuffer.resetSocket(socket);
+                            }
                         } else {
-                            log.info(
-                                    "MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
resetting connection",
-                                    idleTime, 
getConfiguration().getIdleTimeout());
-                            mllpBuffer.resetSocket(socket);
+                            long minDelay = 100;
+                            long delay = Long.min(Long.max(minDelay, 
getConfiguration().getIdleTimeout() - idleTime),
+                                    getConfiguration().getIdleTimeout());
+                            if (log.isDebugEnabled()) {
+                                log.debug("Scheduling idle producer connection 
check of {} in {} milliseconds",
+                                        getConnectionAddress(), delay);
+                            }
+                            idleTimeoutExecutor.schedule(this, delay, 
TimeUnit.MILLISECONDS);
                         }
                     } else {
-                        long minDelay = 100;
-                        long delay = Long.min(Long.max(minDelay, 
getConfiguration().getIdleTimeout() - idleTime),
+                        log.debug(
+                                "No activity detected since initial connection 
- scheduling idle producer connection check in {} milliseconds",
                                 getConfiguration().getIdleTimeout());
-                        if (log.isDebugEnabled()) {
-                            log.debug("Scheduling idle producer connection 
check of {} in {} milliseconds",
-                                    getConnectionAddress(), delay);
-                        }
-                        idleTimeoutExecutor.schedule(this, delay, 
TimeUnit.MILLISECONDS);
+                        idleTimeoutExecutor.schedule(this, 
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
                     }
-                } else {
-                    log.debug(
-                            "No activity detected since initial connection - 
scheduling idle producer connection check in {} milliseconds",
-                            getConfiguration().getIdleTimeout());
-                    idleTimeoutExecutor.schedule(this, 
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
                 }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
index d4675fc586b..653d32b03ea 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.mongodb;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.model.FindOneAndUpdateOptions;
@@ -36,6 +39,7 @@ public class MongoDbTailTrackingManager {
 
     private final MongoClient connection;
     private final MongoDbTailTrackingConfig config;
+    private final Lock lock = new ReentrantLock();
     private MongoCollection<Document> dbCol;
     private Document trackingObj;
 
@@ -60,32 +64,42 @@ public class MongoDbTailTrackingManager {
         trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
     }
 
-    public synchronized void persistToStore() {
-        if (!config.persistent || lastVal == null) {
-            return;
+    public void persistToStore() {
+        lock.lock();
+        try {
+            if (!config.persistent || lastVal == null) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Persisting lastVal={} to store, collection: {}", 
lastVal, config.collection);
+            }
+
+            Bson updateObj = Updates.set(config.field, lastVal);
+            FindOneAndUpdateOptions options = new 
FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
+            trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, 
options);
+        } finally {
+            lock.unlock();
         }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Persisting lastVal={} to store, collection: {}", 
lastVal, config.collection);
-        }
-
-        Bson updateObj = Updates.set(config.field, lastVal);
-        FindOneAndUpdateOptions options = new 
FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
-        trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, options);
     }
 
-    public synchronized Object recoverFromStore() {
-        if (!config.persistent) {
-            return null;
-        }
+    public Object recoverFromStore() {
+        lock.lock();
+        try {
+            if (!config.persistent) {
+                return null;
+            }
 
-        lastVal = dbCol.find(trackingObj).first().get(config.field);
+            lastVal = dbCol.find(trackingObj).first().get(config.field);
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovered lastVal={} from store, collection: {}", 
lastVal, config.collection);
-        }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Recovered lastVal={} from store, collection: {}", 
lastVal, config.collection);
+            }
 
-        return lastVal;
+            return lastVal;
+        } finally {
+            lock.unlock();
+        }
     }
 
     public void setLastVal(Document dbObj) {
diff --git 
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
 
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
index e8112e436a4..e0c1e976c09 100644
--- 
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
+++ 
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
@@ -45,17 +45,22 @@ public class MyBatisComponent extends HealthCheckComponent {
         return answer;
     }
 
-    protected synchronized SqlSessionFactory createSqlSessionFactory() throws 
IOException {
-        if (sqlSessionFactory == null) {
-            ObjectHelper.notNull(configurationUri, "configurationUri", this);
-            InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), 
configurationUri);
-            try {
-                sqlSessionFactory = new SqlSessionFactoryBuilder().build(is);
-            } finally {
-                IOHelper.close(is);
+    protected SqlSessionFactory createSqlSessionFactory() throws IOException {
+        lock.lock();
+        try {
+            if (sqlSessionFactory == null) {
+                ObjectHelper.notNull(configurationUri, "configurationUri", 
this);
+                InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), 
configurationUri);
+                try {
+                    sqlSessionFactory = new 
SqlSessionFactoryBuilder().build(is);
+                } finally {
+                    IOHelper.close(is);
+                }
             }
+            return sqlSessionFactory;
+        } finally {
+            lock.unlock();
         }
-        return sqlSessionFactory;
     }
 
     public SqlSessionFactory getSqlSessionFactory() {

Reply via email to