This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2ffdf879159 CAMEL-20199: Remove synchronized block from components J
to M (#16134)
2ffdf879159 is described below
commit 2ffdf8791591032c9cf4ca93706970f753fa930c
Author: Nicolas Filotto <[email protected]>
AuthorDate: Thu Oct 31 17:23:12 2024 +0100
CAMEL-20199: Remove synchronized block from components J to M (#16134)
## Motivation
For better support of virtual threads, we need to avoid lengthy and
frequent pinning by replacing synchronized blocks with ReentrantLocks
## Modifications:
* Replace mutex with locks
* Use locks instead of synchronized blocks
* Use ConcurrentHashMap instead of HashMap when possible
---
.../camel/component/milvus/MilvusEndpoint.java | 11 +-
.../component/github/consumer/CommitConsumer.java | 16 +-
.../jackson/converter/JacksonTypeConverters.java | 11 +-
.../component/jasypt/JasyptPropertiesParser.java | 49 +--
.../converter/jaxb/FallbackTypeConverter.java | 28 +-
.../camel/component/jcache/JCacheManager.java | 45 ++-
.../apache/camel/component/jcr/JcrConsumer.java | 132 ++++---
.../camel/component/jetty/JettyHttpComponent.java | 259 +++++++------
.../apache/camel/component/jira/JiraEndpoint.java | 76 ++--
.../component/jms/EndpointMessageListener.java | 17 +-
.../apache/camel/component/jms/JmsComponent.java | 19 +-
.../apache/camel/component/jms/JmsProducer.java | 5 +-
.../component/jms/JmsTemporaryQueueEndpoint.java | 13 +-
.../component/jms/JmsTemporaryTopicEndpoint.java | 13 +-
.../component/jms/StreamMessageInputStream.java | 8 +-
.../jms/reply/MessageSelectorCreator.java | 14 +-
.../component/jms/reply/QueueReplyManager.java | 5 +-
.../jmx/JMXConsumerNotificationFilter.java | 43 ++-
.../apache/camel/component/jolt/JoltEndpoint.java | 63 ++--
.../apache/camel/component/jpa/JpaComponent.java | 17 +-
.../apache/camel/component/jslt/JsltEndpoint.java | 92 ++---
.../jsonvalidator/JsonValidatorEndpoint.java | 5 +-
.../java/org/apache/camel/jsonpath/JsonStream.java | 2 +-
.../camel/component/jt400/Jt400Component.java | 15 +-
.../component/jt400/Jt400MsgQueueConsumer.java | 73 ++--
.../jta/JtaTransactionErrorHandlerReifier.java | 43 ++-
.../camel/component/kafka/KafkaFetchRecords.java | 2 +-
.../camel/component/kamelet/KameletComponent.java | 27 +-
.../camel/component/knative/KnativeComponent.java | 26 +-
.../camel/component/master/MasterConsumer.java | 110 +++---
.../component/mllp/MllpTcpClientProducer.java | 413 +++++++++++----------
.../mongodb/MongoDbTailTrackingManager.java | 54 ++-
.../camel/component/mybatis/MyBatisComponent.java | 23 +-
33 files changed, 982 insertions(+), 747 deletions(-)
diff --git
a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
index 0957ab82adb..c1f8146f63a 100644
---
a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
+++
b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java
@@ -56,8 +56,6 @@ public class MilvusEndpoint extends DefaultEndpoint
implements EndpointServiceLo
@UriParam
private MilvusConfiguration configuration;
- private final Object lock;
-
private volatile boolean closeClient;
private volatile MilvusClient client;
@@ -71,8 +69,6 @@ public class MilvusEndpoint extends DefaultEndpoint
implements EndpointServiceLo
this.collection = collection;
this.configuration = configuration;
-
- this.lock = new Object();
}
@Override
@@ -93,9 +89,10 @@ public class MilvusEndpoint extends DefaultEndpoint
implements EndpointServiceLo
return collection;
}
- public synchronized MilvusClient getClient() {
+ public MilvusClient getClient() {
if (this.client == null) {
- synchronized (this.lock) {
+ lock.lock();
+ try {
if (this.client == null) {
this.client = this.configuration.getClient();
this.closeClient = false;
@@ -105,6 +102,8 @@ public class MilvusEndpoint extends DefaultEndpoint
implements EndpointServiceLo
this.closeClient = true;
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index 03bda8756e3..ec03b96ee36 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -75,7 +75,8 @@ public class CommitConsumer extends AbstractGitHubConsumer {
@Override
protected void doStart() throws Exception {
- synchronized (this) {
+ lock.lock();
+ try {
super.doStart();
// ensure we start from clean
@@ -97,24 +98,29 @@ public class CommitConsumer extends AbstractGitHubConsumer {
LOG.info("Starting from beginning");
}
started = true;
+ } finally {
+ lock.unlock();
}
}
@Override
protected void doStop() throws Exception {
- synchronized (this) {
+ lock.lock();
+ try {
super.doStop();
commitHashes.clear();
lastSha = null;
started = false;
+ } finally {
+ lock.unlock();
}
}
@Override
protected int poll() throws Exception {
- synchronized (this) {
-
+ lock.lock();
+ try {
if (!started) {
return 0;
}
@@ -169,6 +175,8 @@ public class CommitConsumer extends AbstractGitHubConsumer {
}
LOG.debug("Last sha: {}", lastSha);
return counter;
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
index 4bd45abb1d1..3e419ecacc0 100644
---
a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
+++
b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory;
public final class JacksonTypeConverters {
private static final Logger LOG =
LoggerFactory.getLogger(JacksonTypeConverters.class);
- private final Object lock;
+ private final Lock lock;
private volatile ObjectMapper defaultMapper;
private boolean init;
@@ -66,7 +68,7 @@ public final class JacksonTypeConverters {
private String moduleClassNames;
public JacksonTypeConverters() {
- this.lock = new Object();
+ this.lock = new ReentrantLock();
}
@Converter
@@ -306,7 +308,8 @@ public final class JacksonTypeConverters {
}
if (defaultMapper == null) {
- synchronized (lock) {
+ lock.lock();
+ try {
if (defaultMapper == null) {
ObjectMapper mapper = new ObjectMapper();
if (moduleClassNames != null) {
@@ -322,6 +325,8 @@ public final class JacksonTypeConverters {
defaultMapper = mapper;
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
index 0dc7ddbec66..b95b9300822 100644
---
a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
+++
b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.jasypt;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -43,15 +45,13 @@ public class JasyptPropertiesParser extends
DefaultPropertiesParser {
= JASYPT_PREFIX_TOKEN.replace("(", "\\(") + "(.+?)" +
JASYPT_SUFFIX_TOKEN.replace(")", "\\)");
private static final Pattern PATTERN = Pattern.compile(JASYPT_REGEX);
+ private final Lock lock = new ReentrantLock();
private StringEncryptor encryptor;
private String password;
private String algorithm;
private String randomSaltGeneratorAlgorithm;
private String randomIvGeneratorAlgorithm;
- public JasyptPropertiesParser() {
- }
-
@Override
public String parseProperty(String key, String value, PropertiesLookup
properties) {
log.trace("Parsing property '{}={}'", key, value);
@@ -69,27 +69,32 @@ public class JasyptPropertiesParser extends
DefaultPropertiesParser {
return value;
}
- private synchronized void initEncryptor() {
- if (encryptor == null) {
- StringHelper.notEmpty("password", password);
- StandardPBEStringEncryptor pbeStringEncryptor = new
StandardPBEStringEncryptor();
-
- pbeStringEncryptor.setPassword(password);
- if (algorithm != null) {
- pbeStringEncryptor.setAlgorithm(algorithm);
- log.debug("Initialized encryptor using {} algorithm and
provided password", algorithm);
- } else {
- log.debug("Initialized encryptor using default algorithm and
provided password");
- }
+ private void initEncryptor() {
+ lock.lock();
+ try {
+ if (encryptor == null) {
+ StringHelper.notEmpty("password", password);
+ StandardPBEStringEncryptor pbeStringEncryptor = new
StandardPBEStringEncryptor();
+
+ pbeStringEncryptor.setPassword(password);
+ if (algorithm != null) {
+ pbeStringEncryptor.setAlgorithm(algorithm);
+ log.debug("Initialized encryptor using {} algorithm and
provided password", algorithm);
+ } else {
+ log.debug("Initialized encryptor using default algorithm
and provided password");
+ }
- if (randomSaltGeneratorAlgorithm != null) {
- pbeStringEncryptor.setSaltGenerator(new
RandomSaltGenerator(randomSaltGeneratorAlgorithm));
- }
- if (randomIvGeneratorAlgorithm != null) {
- pbeStringEncryptor.setIvGenerator(new
RandomIvGenerator(randomIvGeneratorAlgorithm));
- }
+ if (randomSaltGeneratorAlgorithm != null) {
+ pbeStringEncryptor.setSaltGenerator(new
RandomSaltGenerator(randomSaltGeneratorAlgorithm));
+ }
+ if (randomIvGeneratorAlgorithm != null) {
+ pbeStringEncryptor.setIvGenerator(new
RandomIvGenerator(randomIvGeneratorAlgorithm));
+ }
- encryptor = pbeStringEncryptor;
+ encryptor = pbeStringEncryptor;
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
index e6dba9c4e90..1b8622e701d 100644
---
a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
+++
b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java
@@ -28,6 +28,8 @@ import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import jakarta.xml.bind.JAXBContext;
import jakarta.xml.bind.JAXBElement;
@@ -63,6 +65,7 @@ public class FallbackTypeConverter {
private static final Logger LOG =
LoggerFactory.getLogger(FallbackTypeConverter.class);
+ private final Lock lock = new ReentrantLock();
private final Map<AnnotatedElement, JAXBContext> contexts = new
HashMap<>();
private final StaxConverter staxConverter = new StaxConverter();
private boolean defaultPrettyPrint = true;
@@ -318,19 +321,24 @@ public class FallbackTypeConverter {
return exchange != null &&
exchange.getProperty(Exchange.FILTER_NON_XML_CHARS, Boolean.FALSE,
Boolean.class);
}
- protected synchronized <T> JAXBContext createContext(Class<T> type) throws
JAXBException {
+ protected <T> JAXBContext createContext(Class<T> type) throws
JAXBException {
AnnotatedElement ae = hasXmlRootElement(type) ? type :
type.getPackage();
- JAXBContext context = contexts.get(ae);
- if (context == null) {
- if (hasXmlRootElement(type)) {
- context = JAXBContext.newInstance(type);
- contexts.put(type, context);
- } else {
- context = JAXBContext.newInstance(type.getPackage().getName());
- contexts.put(type.getPackage(), context);
+ lock.lock();
+ try {
+ JAXBContext context = contexts.get(ae);
+ if (context == null) {
+ if (hasXmlRootElement(type)) {
+ context = JAXBContext.newInstance(type);
+ contexts.put(type, context);
+ } else {
+ context =
JAXBContext.newInstance(type.getPackage().getName());
+ contexts.put(type.getPackage(), context);
+ }
}
+ return context;
+ } finally {
+ lock.unlock();
}
- return context;
}
protected <T> Unmarshaller getUnmarshaller(Class<T> type) throws
JAXBException {
diff --git
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
index 0b490cdcbbc..ebde348720a 100644
---
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
+++
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.cache.Cache;
import javax.cache.CacheManager;
@@ -38,6 +40,7 @@ public class JCacheManager<K, V> implements Closeable {
private final JCacheConfiguration configuration;
private final String cacheName;
private final CamelContext camelContext;
+ private final Lock lock = new ReentrantLock();
private CachingProvider provider;
private CacheManager manager;
private Cache<K, V> cache;
@@ -68,29 +71,39 @@ public class JCacheManager<K, V> implements Closeable {
return this.configuration;
}
- public synchronized Cache<K, V> getCache() throws Exception {
- if (cache == null) {
- JCacheProvider provider =
JCacheProviders.lookup(configuration.getCachingProvider());
- cache = doGetCache(provider);
- }
+ public Cache<K, V> getCache() throws Exception {
+ lock.lock();
+ try {
+ if (cache == null) {
+ JCacheProvider provider =
JCacheProviders.lookup(configuration.getCachingProvider());
+ cache = doGetCache(provider);
+ }
- return cache;
+ return cache;
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized void close() throws IOException {
- if (configuration != null) {
- if (cache != null) {
- cache.close();
- }
+ public void close() throws IOException {
+ lock.lock();
+ try {
+ if (configuration != null) {
+ if (cache != null) {
+ cache.close();
+ }
- if (manager != null) {
- manager.close();
- }
+ if (manager != null) {
+ manager.close();
+ }
- if (provider != null) {
- provider.close();
+ if (provider != null) {
+ provider.close();
+ }
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
index 2723004465e..63d628337de 100644
---
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
+++
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
@@ -63,86 +63,97 @@ public class JcrConsumer extends DefaultConsumer {
return (JcrEndpoint) getEndpoint();
}
- private synchronized void createSessionAndRegisterListener() throws
RepositoryException {
- LOG.trace("createSessionAndRegisterListener START");
-
- if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) {
- session =
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
- } else {
- session =
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(),
- getJcrEndpoint().getWorkspaceName());
- }
+ private void createSessionAndRegisterListener() throws RepositoryException
{
+ lock.lock();
+ try {
+ LOG.trace("createSessionAndRegisterListener START");
+
+ if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) {
+ session =
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials());
+ } else {
+ session =
getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(),
+ getJcrEndpoint().getWorkspaceName());
+ }
- int eventTypes = getJcrEndpoint().getEventTypes();
- String absPath = getJcrEndpoint().getBase();
+ int eventTypes = getJcrEndpoint().getEventTypes();
+ String absPath = getJcrEndpoint().getBase();
- if (absPath == null) {
- absPath = "/";
- } else if (!absPath.startsWith("/")) {
- absPath = "/" + absPath;
- }
+ if (absPath == null) {
+ absPath = "/";
+ } else if (!absPath.startsWith("/")) {
+ absPath = "/" + absPath;
+ }
- boolean isDeep = getJcrEndpoint().isDeep();
- String[] uuid = null;
- String uuids = getJcrEndpoint().getUuids();
+ boolean isDeep = getJcrEndpoint().isDeep();
+ String[] uuid = null;
+ String uuids = getJcrEndpoint().getUuids();
- if (uuids != null) {
- uuids = uuids.trim();
+ if (uuids != null) {
+ uuids = uuids.trim();
- if (!uuids.isEmpty()) {
- uuid = uuids.split(",");
+ if (!uuids.isEmpty()) {
+ uuid = uuids.split(",");
+ }
}
- }
- String[] nodeTypeName = null;
- String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
+ String[] nodeTypeName = null;
+ String nodeTypeNames = getJcrEndpoint().getNodeTypeNames();
- if (nodeTypeNames != null) {
- nodeTypeNames = nodeTypeNames.trim();
+ if (nodeTypeNames != null) {
+ nodeTypeNames = nodeTypeNames.trim();
- if (!nodeTypeNames.isEmpty()) {
- nodeTypeName = nodeTypeNames.split(",");
+ if (!nodeTypeNames.isEmpty()) {
+ nodeTypeName = nodeTypeNames.split(",");
+ }
}
- }
- boolean noLocal = getJcrEndpoint().isNoLocal();
+ boolean noLocal = getJcrEndpoint().isNoLocal();
- eventListener = new EndpointEventListener(this, getJcrEndpoint(),
getProcessor());
+ eventListener = new EndpointEventListener(this, getJcrEndpoint(),
getProcessor());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={},
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
- eventListener, absPath, eventTypes, isDeep,
Arrays.toString(uuid), Arrays.toString(nodeTypeName),
- noLocal);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Adding JCR Event Listener, {}, on {}. eventTypes={},
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
+ eventListener, absPath, eventTypes, isDeep,
Arrays.toString(uuid), Arrays.toString(nodeTypeName),
+ noLocal);
+ }
- session.getWorkspace().getObservationManager()
- .addEventListener(eventListener, eventTypes, absPath, isDeep,
uuid, nodeTypeName, noLocal);
+ session.getWorkspace().getObservationManager()
+ .addEventListener(eventListener, eventTypes, absPath,
isDeep, uuid, nodeTypeName, noLocal);
- LOG.trace("createSessionAndRegisterListener END");
+ LOG.trace("createSessionAndRegisterListener END");
+ } finally {
+ lock.unlock();
+ }
}
- private synchronized void unregisterListenerAndLogoutSession() throws
RepositoryException {
- LOG.trace("unregisterListenerAndLogoutSession START");
+ private void unregisterListenerAndLogoutSession() throws
RepositoryException {
+ lock.lock();
+ try {
+ LOG.trace("unregisterListenerAndLogoutSession START");
- if (session != null) {
- try {
- if (!session.isLive()) {
- LOG.info("Session was is no more live.");
- } else {
- if (eventListener != null) {
-
session.getWorkspace().getObservationManager().removeEventListener(eventListener);
- eventListener = null;
+ if (session != null) {
+ try {
+ if (!session.isLive()) {
+ LOG.info("Session was is no more live.");
+ } else {
+ if (eventListener != null) {
+
session.getWorkspace().getObservationManager().removeEventListener(eventListener);
+ eventListener = null;
+ }
+
+ session.logout();
}
-
- session.logout();
+ } finally {
+ eventListener = null;
+ session = null;
}
- } finally {
- eventListener = null;
- session = null;
}
- }
- LOG.trace("unregisterListenerAndLogoutSession END");
+ LOG.trace("unregisterListenerAndLogoutSession END");
+ } finally {
+ lock.unlock();
+ }
}
private void cancelSessionListenerChecker() {
@@ -170,7 +181,8 @@ public class JcrConsumer extends DefaultConsumer {
boolean isSessionLive = false;
- synchronized (this) {
+ lock.lock();
+ try {
if (JcrConsumer.this.session != null) {
try {
isSessionLive = JcrConsumer.this.session.isLive();
@@ -178,6 +190,8 @@ public class JcrConsumer extends DefaultConsumer {
LOG.debug("Exception while checking jcr session", e);
}
}
+ } finally {
+ lock.unlock();
}
if (!isSessionLive) {
diff --git
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
index 4bf8293d7db..baa13924e82 100644
---
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
+++
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
@@ -28,6 +28,7 @@ import java.util.EventListener;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import jakarta.servlet.Filter;
import jakarta.servlet.MultipartConfigElement;
@@ -90,7 +91,7 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
implements RestConsumerFactory, RestApiConsumerFactory,
SSLContextParametersAware {
public static final String TMP_DIR = "CamelJettyTempDir";
- protected static final HashMap<String, ConnectorRef> CONNECTORS = new
HashMap<>();
+ protected static final Map<String, ConnectorRef> CONNECTORS = new
ConcurrentHashMap<>();
private static final Logger LOG =
LoggerFactory.getLogger(JettyHttpComponent.class);
private static final String JETTY_SSL_KEYSTORE =
"org.eclipse.jetty.ssl.keystore";
@@ -282,20 +283,18 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
JettyHttpEndpoint endpoint = (JettyHttpEndpoint)
consumer.getEndpoint();
String connectorKey = getConnectorKey(endpoint);
- synchronized (CONNECTORS) {
- ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
-
- // check if there are already another consumer on the same
context-path and if so fail
- if (connectorRef != null) {
- for (Map.Entry<String, HttpConsumer> entry :
connectorRef.servlet.getConsumers().entrySet()) {
- String path = entry.getValue().getPath();
- CamelContext camelContext =
entry.getValue().getEndpoint().getCamelContext();
- if (consumer.getPath().equals(path)) {
- // its allowed if they are from the same camel context
- boolean sameContext =
consumer.getEndpoint().getCamelContext() == camelContext;
- if (!sameContext) {
- return false;
- }
+ ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
+
+ // check if there are already another consumer on the same
context-path and if so fail
+ if (connectorRef != null) {
+ for (Map.Entry<String, HttpConsumer> entry :
connectorRef.servlet.getConsumers().entrySet()) {
+ String path = entry.getValue().getPath();
+ CamelContext camelContext =
entry.getValue().getEndpoint().getCamelContext();
+ if (consumer.getPath().equals(path)) {
+ // its allowed if they are from the same camel context
+ boolean sameContext =
consumer.getEndpoint().getCamelContext() == camelContext;
+ if (!sameContext) {
+ return false;
}
}
}
@@ -313,68 +312,77 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
JettyHttpEndpoint endpoint = (JettyHttpEndpoint)
consumer.getEndpoint();
String connectorKey = getConnectorKey(endpoint);
- synchronized (CONNECTORS) {
- ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
- if (connectorRef == null) {
- Server server = createServer();
- Connector connector = getConnector(server, endpoint);
- if
("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost())) {
- LOG.warn("You use localhost interface! It means that no
external connections will be available. "
- + "Don't you want to use 0.0.0.0 instead (all
network interfaces)? {}",
- endpoint);
- }
- if (endpoint.isEnableJmx()) {
- enableJmx(server);
- }
- server.addConnector(connector);
-
- connectorRef = new ConnectorRef(
- server, connector,
- createServletForConnector(server, connector,
endpoint.getHandlers(), endpoint));
- // must enable session before we start
- if (endpoint.isSessionSupport()) {
- enableSessionSupport(connectorRef.server, connectorKey);
- }
- connectorRef.server.start();
+ CONNECTORS.compute(connectorKey, (cKey, connectorRef) -> {
+ try {
+ return connect(consumer, endpoint, cKey, connectorRef);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ });
+ }
- LOG.debug("Adding connector key: {} -> {}", connectorKey,
connectorRef);
- CONNECTORS.put(connectorKey, connectorRef);
+ private ConnectorRef connect(
+ HttpConsumer consumer, JettyHttpEndpoint endpoint, String
connectorKey, ConnectorRef connectorRef)
+ throws Exception {
+ if (connectorRef == null) {
+ Server server = createServer();
+ Connector connector = getConnector(server, endpoint);
+ if ("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost()))
{
+ LOG.warn("You use localhost interface! It means that no
external connections will be available. "
+ + "Don't you want to use 0.0.0.0 instead (all network
interfaces)? {}",
+ endpoint);
+ }
+ if (endpoint.isEnableJmx()) {
+ enableJmx(server);
+ }
+ server.addConnector(connector);
+
+ connectorRef = new ConnectorRef(
+ server, connector,
+ createServletForConnector(server, connector,
endpoint.getHandlers(), endpoint));
+ // must enable session before we start
+ if (endpoint.isSessionSupport()) {
+ enableSessionSupport(connectorRef.server, connectorKey);
+ }
+ connectorRef.server.start();
- } else {
- LOG.debug("Using existing connector key: {} -> {}",
connectorKey, connectorRef);
+ LOG.debug("Adding connector key: {} -> {}", connectorKey,
connectorRef);
+ } else {
+ LOG.debug("Using existing connector key: {} -> {}", connectorKey,
connectorRef);
- // check if there are any new handlers, and if so then we need
to re-start the server
- if (endpoint.getHandlers() != null &&
!endpoint.getHandlers().isEmpty()) {
- List<Handler> existingHandlers = new ArrayList<>();
- if (connectorRef.server.getHandlers() != null &&
!connectorRef.server.getHandlers().isEmpty()) {
- existingHandlers = connectorRef.server.getHandlers();
- }
- List<Handler> newHandlers = new
ArrayList<>(endpoint.getHandlers());
- boolean changed =
!existingHandlers.containsAll(newHandlers) &&
!newHandlers.containsAll(existingHandlers);
- if (changed) {
- LOG.debug("Restarting Jetty server due to adding new
Jetty Handlers: {}", newHandlers);
- connectorRef.server.stop();
- addJettyHandlers(connectorRef.server,
endpoint.getHandlers());
- connectorRef.server.start();
- }
+ // check if there are any new handlers, and if so then we need to
re-start the server
+ if (endpoint.getHandlers() != null &&
!endpoint.getHandlers().isEmpty()) {
+ List<Handler> existingHandlers = new ArrayList<>();
+ if (connectorRef.server.getHandlers() != null &&
!connectorRef.server.getHandlers().isEmpty()) {
+ existingHandlers = connectorRef.server.getHandlers();
}
- // check the session support
- if (endpoint.isSessionSupport()) {
- enableSessionSupport(connectorRef.server, connectorKey);
+ List<Handler> newHandlers = new
ArrayList<>(endpoint.getHandlers());
+ boolean changed = !existingHandlers.containsAll(newHandlers)
&& !newHandlers.containsAll(existingHandlers);
+ if (changed) {
+ LOG.debug("Restarting Jetty server due to adding new Jetty
Handlers: {}", newHandlers);
+ connectorRef.server.stop();
+ addJettyHandlers(connectorRef.server,
endpoint.getHandlers());
+ connectorRef.server.start();
}
- // ref track the connector
- connectorRef.increment();
}
-
- if (endpoint.isEnableMultipartFilter()) {
- enableMultipartFilter(endpoint, connectorRef.server);
+ // check the session support
+ if (endpoint.isSessionSupport()) {
+ enableSessionSupport(connectorRef.server, connectorKey);
}
+ // ref track the connector
+ connectorRef.increment();
+ }
- if (endpoint.getFilters() != null &&
!endpoint.getFilters().isEmpty()) {
- setFilters(endpoint, connectorRef.server);
- }
- connectorRef.servlet.connect(consumer);
+ if (endpoint.isEnableMultipartFilter()) {
+ enableMultipartFilter(endpoint, connectorRef.server);
}
+
+ if (endpoint.getFilters() != null && !endpoint.getFilters().isEmpty())
{
+ setFilters(endpoint, connectorRef.server);
+ }
+ connectorRef.servlet.connect(consumer);
+
+ return connectorRef;
}
private void enableJmx(Server server) {
@@ -388,7 +396,7 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
}
}
- private void enableSessionSupport(Server server, String connectorKey)
throws Exception {
+ private void enableSessionSupport(Server server, String connectorKey) {
ServletContextHandler context =
server.getDescendant(ServletContextHandler.class);
if (context.getSessionHandler() == null) {
SessionHandler sessionHandler = new SessionHandler();
@@ -469,33 +477,39 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
HttpCommonEndpoint endpoint = consumer.getEndpoint();
String connectorKey = getConnectorKey(endpoint);
- synchronized (CONNECTORS) {
- ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
- if (connectorRef != null) {
- connectorRef.servlet.disconnect(consumer);
- if (connectorRef.decrement() == 0) {
-
connectorRef.server.removeConnector(connectorRef.connector);
- connectorRef.connector.stop();
- connectorRef.server.stop();
- CONNECTORS.remove(connectorKey);
- // Camel controls the lifecycle of these entities so
remove the
- // registered MBeans when Camel is done with the managed
objects.
- if (mbContainer != null) {
- this.removeServerMBean(connectorRef.server);
- //mbContainer.removeBean(connectorRef.connector);
- }
- if (defaultQueuedThreadPool != null) {
- try {
- defaultQueuedThreadPool.stop();
- } catch (Exception t) {
- defaultQueuedThreadPool.destroy();
- } finally {
- defaultQueuedThreadPool = null;
- }
- }
+ CONNECTORS.computeIfPresent(connectorKey, (cKey, connectorRef) -> {
+ try {
+ return disconnect(consumer, connectorRef);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ });
+ }
+
+ private ConnectorRef disconnect(HttpConsumer consumer, ConnectorRef
connectorRef) throws Exception {
+ connectorRef.servlet.disconnect(consumer);
+ if (connectorRef.decrement() == 0) {
+ connectorRef.server.removeConnector(connectorRef.connector);
+ connectorRef.connector.stop();
+ connectorRef.server.stop();
+ // Camel controls the lifecycle of these entities so remove the
+ // registered MBeans when Camel is done with the managed objects.
+ if (mbContainer != null) {
+ this.removeServerMBean(connectorRef.server);
+ //mbContainer.removeBean(connectorRef.connector);
+ }
+ if (defaultQueuedThreadPool != null) {
+ try {
+ defaultQueuedThreadPool.stop();
+ } catch (Exception t) {
+ defaultQueuedThreadPool.destroy();
+ } finally {
+ defaultQueuedThreadPool = null;
}
}
+ return null;
}
+ return connectorRef;
}
private String getConnectorKey(HttpCommonEndpoint endpoint) {
@@ -808,25 +822,30 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
throw new IllegalArgumentException("Jetty component does not use
HttpConfiguration.");
}
- public synchronized MBeanContainer getMbContainer() {
- // If null, provide the default implementation.
- if (mbContainer == null) {
- MBeanServer mbs = null;
+ public MBeanContainer getMbContainer() {
+ lock.lock();
+ try {
+ // If null, provide the default implementation.
+ if (mbContainer == null) {
+ MBeanServer mbs = null;
+
+ final ManagementStrategy mStrategy =
this.getCamelContext().getManagementStrategy();
+ final ManagementAgent mAgent = mStrategy.getManagementAgent();
+ if (mAgent != null) {
+ mbs = mAgent.getMBeanServer();
+ }
- final ManagementStrategy mStrategy =
this.getCamelContext().getManagementStrategy();
- final ManagementAgent mAgent = mStrategy.getManagementAgent();
- if (mAgent != null) {
- mbs = mAgent.getMBeanServer();
+ if (mbs != null) {
+ mbContainer = new MBeanContainer(mbs);
+ } else {
+ LOG.warn("JMX disabled in CamelContext. Jetty JMX
extensions will remain disabled.");
+ }
}
- if (mbs != null) {
- mbContainer = new MBeanContainer(mbs);
- } else {
- LOG.warn("JMX disabled in CamelContext. Jetty JMX extensions
will remain disabled.");
- }
+ return this.mbContainer;
+ } finally {
+ lock.unlock();
}
-
- return this.mbContainer;
}
/**
@@ -1344,19 +1363,17 @@ public abstract class JettyHttpComponent extends
HttpCommonComponent
@Override
protected void doStop() throws Exception {
super.doStop();
- if (!CONNECTORS.isEmpty()) {
- for (Map.Entry<String, ConnectorRef> connectorEntry :
CONNECTORS.entrySet()) {
- ConnectorRef connectorRef = connectorEntry.getValue();
- if (connectorRef != null && connectorRef.getRefCount() == 0) {
-
connectorRef.server.removeConnector(connectorRef.connector);
- connectorRef.connector.stop();
- // Camel controls the lifecycle of these entities so
remove the
- // registered MBeans when Camel is done with the managed
objects.
- removeServerMBean(connectorRef.server);
- connectorRef.server.stop();
- //removeServerMBean(connectorRef.connector);
- CONNECTORS.remove(connectorEntry.getKey());
- }
+ for (Map.Entry<String, ConnectorRef> connectorEntry :
CONNECTORS.entrySet()) {
+ ConnectorRef connectorRef = connectorEntry.getValue();
+ if (connectorRef != null && connectorRef.getRefCount() == 0) {
+ connectorRef.server.removeConnector(connectorRef.connector);
+ connectorRef.connector.stop();
+ // Camel controls the lifecycle of these entities so remove the
+ // registered MBeans when Camel is done with the managed
objects.
+ removeServerMBean(connectorRef.server);
+ connectorRef.server.stop();
+ //removeServerMBean(connectorRef.connector);
+ CONNECTORS.remove(connectorEntry.getKey());
}
}
if (mbContainer != null) {
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
index 9d2dab5496f..aeb9797330d 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
@@ -137,43 +137,53 @@ public class JiraEndpoint extends ScheduledPollEndpoint
implements EndpointServi
disconnect();
}
- public synchronized void connect() {
- if (client == null) {
- Registry registry = getCamelContext().getRegistry();
- JiraRestClientFactory factory
- = registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY,
JiraRestClientFactory.class);
- if (factory == null) {
- factory = new OAuthAsynchronousJiraRestClientFactory();
- }
- final URI jiraServerUri = URI.create(configuration.getJiraUrl());
- if (configuration.getUsername() != null) {
- LOG.debug("Connecting to JIRA with Basic authentication with
username/password");
- client =
factory.createWithBasicHttpAuthentication(jiraServerUri,
configuration.getUsername(),
- configuration.getPassword());
- } else if (configuration.getAccessToken() != null
- && configuration.getVerificationCode() == null
- && configuration.getPrivateKey() == null
- && configuration.getConsumerKey() == null) {
- client = factory.create(jiraServerUri, builder -> {
- builder.setHeader("Authorization", "Bearer " +
configuration.getAccessToken());
- });
- } else {
- LOG.debug("Connecting to JIRA with OAuth authentication");
- JiraOAuthAuthenticationHandler oAuthHandler = new
JiraOAuthAuthenticationHandler(
- configuration.getConsumerKey(),
- configuration.getVerificationCode(),
configuration.getPrivateKey(),
- configuration.getAccessToken(),
- configuration.getJiraUrl());
- client = factory.create(jiraServerUri, oAuthHandler);
+ public void connect() {
+ lock.lock();
+ try {
+ if (client == null) {
+ Registry registry = getCamelContext().getRegistry();
+ JiraRestClientFactory factory
+ =
registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY,
JiraRestClientFactory.class);
+ if (factory == null) {
+ factory = new OAuthAsynchronousJiraRestClientFactory();
+ }
+ final URI jiraServerUri =
URI.create(configuration.getJiraUrl());
+ if (configuration.getUsername() != null) {
+ LOG.debug("Connecting to JIRA with Basic authentication
with username/password");
+ client =
factory.createWithBasicHttpAuthentication(jiraServerUri,
configuration.getUsername(),
+ configuration.getPassword());
+ } else if (configuration.getAccessToken() != null
+ && configuration.getVerificationCode() == null
+ && configuration.getPrivateKey() == null
+ && configuration.getConsumerKey() == null) {
+ client = factory.create(jiraServerUri, builder -> {
+ builder.setHeader("Authorization", "Bearer " +
configuration.getAccessToken());
+ });
+ } else {
+ LOG.debug("Connecting to JIRA with OAuth authentication");
+ JiraOAuthAuthenticationHandler oAuthHandler = new
JiraOAuthAuthenticationHandler(
+ configuration.getConsumerKey(),
+ configuration.getVerificationCode(),
configuration.getPrivateKey(),
+ configuration.getAccessToken(),
+ configuration.getJiraUrl());
+ client = factory.create(jiraServerUri, oAuthHandler);
+ }
}
+ } finally {
+ lock.unlock();
}
}
- public synchronized void disconnect() throws Exception {
- if (client != null) {
- LOG.debug("Disconnecting from JIRA");
- client.close();
- client = null;
+ public void disconnect() throws Exception {
+ lock.lock();
+ try {
+ if (client != null) {
+ LOG.debug("Disconnecting from JIRA");
+ client.close();
+ client = null;
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index a10d5c7814e..7fd77b11aea 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.jms;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
@@ -45,6 +48,7 @@ import static
org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException;
*/
public class EndpointMessageListener implements SessionAwareMessageListener {
private static final Logger LOG =
LoggerFactory.getLogger(EndpointMessageListener.class);
+ private final Lock lock = new ReentrantLock();
private final JmsConsumer consumer;
private final JmsEndpoint endpoint;
private final AsyncProcessor processor;
@@ -315,11 +319,16 @@ public class EndpointMessageListener implements
SessionAwareMessageListener {
this.eagerPoisonBody = eagerPoisonBody;
}
- public synchronized JmsOperations getTemplate() {
- if (template == null) {
- template = endpoint.createInOnlyTemplate();
+ public JmsOperations getTemplate() {
+ lock.lock();
+ try {
+ if (template == null) {
+ template = endpoint.createInOnlyTemplate();
+ }
+ return template;
+ } finally {
+ lock.unlock();
}
- return template;
}
public void setTemplate(JmsOperations template) {
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index e6ac7d7a008..31a718d09d3 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -1157,14 +1157,19 @@ public class JmsComponent extends
HeaderFilterStrategyComponent {
super.doShutdown();
}
- protected synchronized ExecutorService getAsyncStartStopExecutorService() {
- if (asyncStartStopExecutorService == null) {
- // use a cached thread pool for async start tasks as they can run
for a while, and we need a dedicated thread
- // for each task, and the thread pool will shrink when no more
tasks running
- asyncStartStopExecutorService
- =
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
"AsyncStartStopListener");
+ protected ExecutorService getAsyncStartStopExecutorService() {
+ lock.lock();
+ try {
+ if (asyncStartStopExecutorService == null) {
+ // use a cached thread pool for async start tasks as they can
run for a while, and we need a dedicated thread
+ // for each task, and the thread pool will shrink when no more
tasks running
+ asyncStartStopExecutorService
+ =
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
"AsyncStartStopListener");
+ }
+ return asyncStartStopExecutorService;
+ } finally {
+ lock.unlock();
}
- return asyncStartStopExecutorService;
}
@Override
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index f0b38161cbd..b486bb7b341 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -78,7 +78,8 @@ public class JmsProducer extends DefaultAsyncProducer {
protected void initReplyManager() {
if (!started.get()) {
- synchronized (this) {
+ lock.lock();
+ try {
if (started.get()) {
return;
}
@@ -119,6 +120,8 @@ public class JmsProducer extends DefaultAsyncProducer {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
started.set(true);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
index c8f80a0000d..d02c0b75f0a 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
@@ -57,11 +57,16 @@ public class JmsTemporaryQueueEndpoint extends
JmsQueueEndpoint implements Desti
}
@Override
- public synchronized Destination getJmsDestination(Session session) throws
JMSException {
- if (jmsDestination == null) {
- jmsDestination = createJmsDestination(session);
+ public Destination getJmsDestination(Session session) throws JMSException {
+ lock.lock();
+ try {
+ if (jmsDestination == null) {
+ jmsDestination = createJmsDestination(session);
+ }
+ return jmsDestination;
+ } finally {
+ lock.unlock();
}
- return jmsDestination;
}
protected Destination createJmsDestination(Session session) throws
JMSException {
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
index 2bd5cc30775..68a2b031c5d 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
@@ -50,11 +50,16 @@ public class JmsTemporaryTopicEndpoint extends JmsEndpoint
implements Destinatio
}
@Override
- public synchronized Destination getJmsDestination(Session session) throws
JMSException {
- if (jmsDestination == null) {
- jmsDestination = createJmsDestination(session);
+ public Destination getJmsDestination(Session session) throws JMSException {
+ lock.lock();
+ try {
+ if (jmsDestination == null) {
+ jmsDestination = createJmsDestination(session);
+ }
+ return jmsDestination;
+ } finally {
+ lock.unlock();
}
- return jmsDestination;
}
protected Destination createJmsDestination(Session session) throws
JMSException {
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
index 72295b24abc..400e96dd2db 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.jms;
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import jakarta.jms.JMSException;
import jakarta.jms.MessageEOFException;
@@ -25,6 +27,7 @@ import jakarta.jms.StreamMessage;
public class StreamMessageInputStream extends InputStream {
+ private final Lock lock = new ReentrantLock();
private final StreamMessage message;
private volatile boolean eof;
@@ -74,11 +77,14 @@ public class StreamMessageInputStream extends InputStream {
}
@Override
- public synchronized void reset() throws IOException {
+ public void reset() throws IOException {
+ lock.lock();
try {
message.reset();
} catch (JMSException e) {
throw new IOException(e);
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
index 9c7f93d2d1e..19c3c67341e 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.jms.reply;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.TimeoutMap;
import org.slf4j.Logger;
@@ -34,7 +36,7 @@ public class MessageSelectorCreator {
protected final ConcurrentSkipListSet<String> correlationIds;
protected volatile boolean dirty = true;
protected StringBuilder expression;
- private final Object lock = new Object();
+ private final Lock lock = new ReentrantLock();
public MessageSelectorCreator(CorrelationTimeoutMap timeoutMap) {
this.timeoutMap = timeoutMap;
@@ -46,7 +48,8 @@ public class MessageSelectorCreator {
}
public String get() {
- synchronized (lock) {
+ lock.lock();
+ try {
if (!dirty) {
return expression.toString();
}
@@ -74,18 +77,23 @@ public class MessageSelectorCreator {
dirty = false;
return answer;
+ } finally {
+ lock.unlock();
}
}
// Changes to live correlation-ids invalidate existing message selector
private void timeoutEvent(TimeoutMap.Listener.Type type, String cid) {
- synchronized (lock) {
+ lock.lock();
+ try {
if (type == Put) {
correlationIds.add(cid);
} else if (type == Remove || type == Evict) {
correlationIds.remove(cid);
}
dirty = true;
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index 9d95ff14643..8eb17d17506 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -109,12 +109,15 @@ public class QueueReplyManager extends
ReplyManagerSupport {
Session session, String destinationName,
boolean pubSubDomain)
throws JMSException {
- synchronized (QueueReplyManager.this) {
+ QueueReplyManager.this.lock.lock();
+ try {
// resolve the reply to destination
if (destination == null) {
destination = delegate.resolveDestinationName(session,
destinationName, pubSubDomain);
setReplyTo(destination);
}
+ } finally {
+ QueueReplyManager.this.lock.unlock();
}
return destination;
}
diff --git
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
index 52e39bb957f..2b3ecb843e7 100644
---
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
+++
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.jmx;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.management.AttributeChangeNotification;
import javax.management.AttributeChangeNotificationFilter;
import javax.management.Notification;
@@ -26,6 +29,7 @@ import javax.management.Notification;
*/
public class JMXConsumerNotificationFilter extends
AttributeChangeNotificationFilter {
+ private final Lock lock = new ReentrantLock();
private final String stringToCompare;
private final boolean notifyMatch;
@@ -36,25 +40,30 @@ public class JMXConsumerNotificationFilter extends
AttributeChangeNotificationFi
}
@Override
- public synchronized boolean isNotificationEnabled(Notification
notification) {
- boolean enabled = super.isNotificationEnabled(notification);
- if (!enabled) {
- return false;
- }
+ public boolean isNotificationEnabled(Notification notification) {
+ lock.lock();
+ try {
+ boolean enabled = super.isNotificationEnabled(notification);
+ if (!enabled) {
+ return false;
+ }
- boolean match = false;
- if (stringToCompare != null) {
- AttributeChangeNotification acn = (AttributeChangeNotification)
notification;
- Object newValue = acn.getNewValue();
- // special for null
- if ("null".equals(stringToCompare) && newValue == null) {
- match = true;
- } else if (newValue != null) {
- match = stringToCompare.equals(newValue.toString());
+ boolean match = false;
+ if (stringToCompare != null) {
+ AttributeChangeNotification acn =
(AttributeChangeNotification) notification;
+ Object newValue = acn.getNewValue();
+ // special for null
+ if ("null".equals(stringToCompare) && newValue == null) {
+ match = true;
+ } else if (newValue != null) {
+ match = stringToCompare.equals(newValue.toString());
+ }
+ return notifyMatch == match;
+ } else {
+ return true;
}
- return notifyMatch == match;
- } else {
- return true;
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
index a9981fa385c..532b5959a7a 100644
---
a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
+++
b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java
@@ -82,39 +82,44 @@ public class JoltEndpoint extends ResourceEndpoint {
return "jolt:" + getResourceUri();
}
- private synchronized JoltTransform getTransform() throws Exception {
- if (transform == null) {
- if (log.isDebugEnabled()) {
- String path = getResourceUri();
- log.debug("Jolt content read from resource {} with
resourceUri: {} for endpoint {}", getResourceUri(), path,
- getEndpointUri());
- }
+ private JoltTransform getTransform() throws Exception {
+ getInternalLock().lock();
+ try {
+ if (transform == null) {
+ if (log.isDebugEnabled()) {
+ String path = getResourceUri();
+ log.debug("Jolt content read from resource {} with
resourceUri: {} for endpoint {}", getResourceUri(), path,
+ getEndpointUri());
+ }
- // Sortr does not require a spec
- if (this.transformDsl == JoltTransformType.Sortr) {
- this.transform = new Sortr();
- } else {
- // getResourceAsInputStream also considers the content cache
- Object spec =
JsonUtils.jsonToObject(getResourceAsInputStream());
- switch (this.transformDsl) {
- case Shiftr:
- this.transform = new Shiftr(spec);
- break;
- case Defaultr:
- this.transform = new Defaultr(spec);
- break;
- case Removr:
- this.transform = new Removr(spec);
- break;
- case Chainr:
- default:
- this.transform = Chainr.fromSpec(spec);
- break;
+ // Sortr does not require a spec
+ if (this.transformDsl == JoltTransformType.Sortr) {
+ this.transform = new Sortr();
+ } else {
+ // getResourceAsInputStream also considers the content
cache
+ Object spec =
JsonUtils.jsonToObject(getResourceAsInputStream());
+ switch (this.transformDsl) {
+ case Shiftr:
+ this.transform = new Shiftr(spec);
+ break;
+ case Defaultr:
+ this.transform = new Defaultr(spec);
+ break;
+ case Removr:
+ this.transform = new Removr(spec);
+ break;
+ case Chainr:
+ default:
+ this.transform = Chainr.fromSpec(spec);
+ break;
+ }
}
- }
+ }
+ return transform;
+ } finally {
+ getInternalLock().unlock();
}
- return transform;
}
/**
diff --git
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
index cc55c69c08d..284808c7129 100644
---
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
+++
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
@@ -121,13 +121,18 @@ public class JpaComponent extends HealthCheckComponent {
return aliases;
}
- synchronized ExecutorService getOrCreatePollingConsumerExecutorService() {
- if (pollingConsumerExecutorService == null) {
- LOG.debug("Creating thread pool for JpaPollingConsumer to support
polling using timeout");
- pollingConsumerExecutorService
- =
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
"JpaPollingConsumer");
+ ExecutorService getOrCreatePollingConsumerExecutorService() {
+ lock.lock();
+ try {
+ if (pollingConsumerExecutorService == null) {
+ LOG.debug("Creating thread pool for JpaPollingConsumer to
support polling using timeout");
+ pollingConsumerExecutorService
+ =
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
"JpaPollingConsumer");
+ }
+ return pollingConsumerExecutorService;
+ } finally {
+ lock.unlock();
}
- return pollingConsumerExecutorService;
}
// Implementation methods
diff --git
a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
index d77208f267c..4e8025bf5c2 100644
---
a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
+++
b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java
@@ -107,62 +107,68 @@ public class JsltEndpoint extends ResourceEndpoint {
return "jslt:" + getResourceUri();
}
- private synchronized Expression getTransform(Message msg) throws Exception
{
- final String jsltStringFromHeader
- = allowTemplateFromHeader ?
msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null;
+ private Expression getTransform(Message msg) throws Exception {
+ getInternalLock().lock();
+ try {
- final boolean useTemplateFromUri = jsltStringFromHeader == null;
+ final String jsltStringFromHeader
+ = allowTemplateFromHeader ?
msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null;
- if (useTemplateFromUri && transform != null) {
- return transform;
- }
+ final boolean useTemplateFromUri = jsltStringFromHeader == null;
- final Collection<Function> functions = Objects.requireNonNullElse(
- ((JsltComponent) getComponent()).getFunctions(),
- Collections.emptyList());
+ if (useTemplateFromUri && transform != null) {
+ return transform;
+ }
- final JsonFilter objectFilter = Objects.requireNonNullElse(
- ((JsltComponent) getComponent()).getObjectFilter(),
- DEFAULT_JSON_FILTER);
+ final Collection<Function> functions = Objects.requireNonNullElse(
+ ((JsltComponent) getComponent()).getFunctions(),
+ Collections.emptyList());
- final String transformSource;
- final InputStream stream;
+ final JsonFilter objectFilter = Objects.requireNonNullElse(
+ ((JsltComponent) getComponent()).getObjectFilter(),
+ DEFAULT_JSON_FILTER);
- if (useTemplateFromUri) {
- transformSource = getResourceUri();
+ final String transformSource;
+ final InputStream stream;
+
+ if (useTemplateFromUri) {
+ transformSource = getResourceUri();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Jslt content read from resource {} with
resourceUri: {} for endpoint {}",
+ transformSource,
+ transformSource,
+ getEndpointUri());
+ }
- if (log.isDebugEnabled()) {
- log.debug("Jslt content read from resource {} with
resourceUri: {} for endpoint {}",
- transformSource,
- transformSource,
- getEndpointUri());
+ stream =
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(),
transformSource);
+ if (stream == null) {
+ throw new JsltException("Cannot load resource '" +
transformSource + "': not found");
+ }
+ } else { // use template from header
+ stream = new
ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8));
+ transformSource = "<inline>";
}
- stream =
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(),
transformSource);
- if (stream == null) {
- throw new JsltException("Cannot load resource '" +
transformSource + "': not found");
+ final Expression transform;
+ try {
+ transform = new Parser(new InputStreamReader(stream))
+ .withFunctions(functions)
+ .withObjectFilter(objectFilter)
+ .withSource(transformSource)
+ .compile();
+ } finally {
+ // the stream is consumed only on .compile(), cannot be closed
before
+ IOHelper.close(stream);
}
- } else { // use template from header
- stream = new
ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8));
- transformSource = "<inline>";
- }
- final Expression transform;
- try {
- transform = new Parser(new InputStreamReader(stream))
- .withFunctions(functions)
- .withObjectFilter(objectFilter)
- .withSource(transformSource)
- .compile();
+ if (useTemplateFromUri) {
+ this.transform = transform;
+ }
+ return transform;
} finally {
- // the stream is consumed only on .compile(), cannot be closed
before
- IOHelper.close(stream);
- }
-
- if (useTemplateFromUri) {
- this.transform = transform;
+ getInternalLock().unlock();
}
- return transform;
}
public JsltEndpoint findOrCreateEndpoint(String uri, String
newResourceUri) {
diff --git
a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
index 893d118a465..9c921cedefb 100644
---
a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
+++
b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
@@ -188,10 +188,13 @@ public class JsonValidatorEndpoint extends
ResourceEndpoint {
* @return The currently loaded schema
*/
private JsonSchema getOrCreateSchema() throws Exception {
- synchronized (this) {
+ getInternalLock().lock();
+ try {
if (this.schema == null) {
this.schema =
this.uriSchemaLoader.createSchema(getCamelContext(), getResourceUri());
}
+ } finally {
+ getInternalLock().unlock();
}
return this.schema;
}
diff --git
a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
index 24999b525a9..7b05810a99f 100644
---
a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
+++
b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java
@@ -275,7 +275,7 @@ public class JsonStream extends FilterInputStream {
}
@Override
- public synchronized void reset() throws IOException {
+ public void reset() throws IOException {
throw new IOException("reset not supported");
}
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
index 10e448b6a54..13ca2d83cbf 100644
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
@@ -86,12 +86,17 @@ public class Jt400Component extends HealthCheckComponent {
*
* @return the default connection pool used by this component
*/
- public synchronized AS400ConnectionPool getConnectionPool() {
- if (connectionPool == null) {
- LOG.info("Instantiating the default connection pool ...");
- connectionPool = new AS400ConnectionPool();
+ public AS400ConnectionPool getConnectionPool() {
+ lock.lock();
+ try {
+ if (connectionPool == null) {
+ LOG.info("Instantiating the default connection pool ...");
+ connectionPool = new AS400ConnectionPool();
+ }
+ return connectionPool;
+ } finally {
+ lock.unlock();
}
- return connectionPool;
}
public void setConnectionPool(AS400ConnectionPool connectionPool) {
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
index d6c2114a6df..2f3b2d4db47 100755
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -101,43 +101,48 @@ public class Jt400MsgQueueConsumer extends
ScheduledPollConsumer {
}
}
- private synchronized Exchange receive(MessageQueue queue, long timeout)
throws Exception {
- QueuedMessage entry;
- int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1;
- LOG.trace("Reading from message queue: {} with {} seconds timeout",
queue.getPath(),
- -1 == seconds ? "infinite" : seconds);
-
- Jt400Configuration.MessageAction messageAction =
getEndpoint().getMessageAction();
- entry = queue.receive(messageKey, //message key
- seconds, //timeout
- messageAction.getJt400Value(), // message action
- null == messageKey ? MessageQueue.ANY : MessageQueue.NEXT); //
types of messages
-
- if (null == entry) {
- return null;
- }
- // Need to tuck away the message key if the message action is SAME,
otherwise
- // we'll just keep retrieving the same message over and over
- if (Jt400Configuration.MessageAction.SAME == messageAction) {
- this.messageKey = entry.getKey();
- }
+ private Exchange receive(MessageQueue queue, long timeout) throws
Exception {
+ lock.lock();
+ try {
+ QueuedMessage entry;
+ int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1;
+ LOG.trace("Reading from message queue: {} with {} seconds
timeout", queue.getPath(),
+ -1 == seconds ? "infinite" : seconds);
+
+ Jt400Configuration.MessageAction messageAction =
getEndpoint().getMessageAction();
+ entry = queue.receive(messageKey, //message key
+ seconds, //timeout
+ messageAction.getJt400Value(), // message action
+ null == messageKey ? MessageQueue.ANY :
MessageQueue.NEXT); // types of messages
+
+ if (null == entry) {
+ return null;
+ }
+ // Need to tuck away the message key if the message action is
SAME, otherwise
+ // we'll just keep retrieving the same message over and over
+ if (Jt400Configuration.MessageAction.SAME == messageAction) {
+ this.messageKey = entry.getKey();
+ }
- Exchange exchange = createExchange(true);
- exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
- entry.getFromJobNumber() + "/" + entry.getUser() + "/" +
entry.getFromJobName());
- setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID,
entry.getID());
- setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE,
entry.getFileName());
- setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE,
entry.getType());
- setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity());
- setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE,
entry);
- if (AS400Message.INQUIRY == entry.getType()) {
- setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply());
- if (getEndpoint().isSendingReply()) {
- setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey());
+ Exchange exchange = createExchange(true);
+ exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
+ entry.getFromJobNumber() + "/" + entry.getUser() + "/" +
entry.getFromJobName());
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_ID, entry.getID());
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_FILE, entry.getFileName());
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_TYPE, entry.getType());
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity());
+ setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE,
entry);
+ if (AS400Message.INQUIRY == entry.getType()) {
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply());
+ if (getEndpoint().isSendingReply()) {
+ setHeaderIfValueNotNull(exchange.getIn(),
Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey());
+ }
}
+ exchange.getIn().setBody(entry.getText());
+ return exchange;
+ } finally {
+ lock.unlock();
}
- exchange.getIn().setBody(entry.getText());
- return exchange;
}
private static void setHeaderIfValueNotNull(final Message message, final
String header, final Object value) {
diff --git
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
index db1b75288e1..321d4fbf873 100644
---
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
+++
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java
@@ -161,29 +161,34 @@ public class JtaTransactionErrorHandlerReifier extends
ErrorHandlerReifier<JtaTr
return answer;
}
- protected synchronized ScheduledExecutorService getExecutorService(
+ protected ScheduledExecutorService getExecutorService(
ScheduledExecutorService executorService, String
executorServiceRef) {
- if (executorService == null || executorService.isShutdown()) {
- // camel context will shutdown the executor when it shutdown so no
- // need to shut it down when stopping
- if (executorServiceRef != null) {
- executorService = lookupByNameAndType(executorServiceRef,
ScheduledExecutorService.class);
- if (executorService == null) {
- ExecutorServiceManager manager =
camelContext.getExecutorServiceManager();
- ThreadPoolProfile profile =
manager.getThreadPoolProfile(executorServiceRef);
- executorService = manager.newScheduledThreadPool(this,
executorServiceRef, profile);
+ lock.lock();
+ try {
+ if (executorService == null || executorService.isShutdown()) {
+ // camel context will shutdown the executor when it shutdown
so no
+ // need to shut it down when stopping
+ if (executorServiceRef != null) {
+ executorService = lookupByNameAndType(executorServiceRef,
ScheduledExecutorService.class);
+ if (executorService == null) {
+ ExecutorServiceManager manager =
camelContext.getExecutorServiceManager();
+ ThreadPoolProfile profile =
manager.getThreadPoolProfile(executorServiceRef);
+ executorService = manager.newScheduledThreadPool(this,
executorServiceRef, profile);
+ }
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorService "
+ executorServiceRef + " not found in registry.");
+ }
+ } else {
+ // no explicit configured thread pool, so leave it up to
the
+ // error handler to decide if it need a default thread
pool from
+ // CamelContext#getErrorHandlerExecutorService
+ executorService = null;
}
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorService " +
executorServiceRef + " not found in registry.");
- }
- } else {
- // no explicit configured thread pool, so leave it up to the
- // error handler to decide if it need a default thread pool
from
- // CamelContext#getErrorHandlerExecutorService
- executorService = null;
}
+ return executorService;
+ } finally {
+ lock.unlock();
}
- return executorService;
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 3534fccf3f4..f30af79ef82 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -651,7 +651,7 @@ public class KafkaFetchRecords implements Runnable {
state = State.RESUME_REQUESTED;
}
- private synchronized void setLastError(Exception lastError) {
+ private void setLastError(Exception lastError) {
this.lastError = lastError;
}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 5e3eb6cc9da..e7d493e0d1a 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -23,7 +23,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -63,6 +67,8 @@ public class KameletComponent extends DefaultComponent {
// active consumers
private final Map<String, KameletConsumer> consumers = new HashMap<>();
+ private final Lock consumersLock = new ReentrantLock();
+ private final Condition consumersCondition = consumersLock.newCondition();
// active kamelet EIPs
private final Map<String, Processor> kameletEips = new
ConcurrentHashMap<>();
@@ -344,7 +350,8 @@ public class KameletComponent extends DefaultComponent {
}
public void addConsumer(String key, KameletConsumer consumer) {
- synchronized (consumers) {
+ consumersLock.lock();
+ try {
if (consumers.putIfAbsent(key, consumer) != null) {
throw new IllegalArgumentException(
"Cannot add a 2nd consumer to the same endpoint: " +
key
@@ -352,21 +359,27 @@ public class KameletComponent extends DefaultComponent {
}
// state changed so inc counter
stateCounter++;
- consumers.notifyAll();
+ consumersCondition.signalAll();
+ } finally {
+ consumersLock.unlock();
}
}
public void removeConsumer(String key, KameletConsumer consumer) {
- synchronized (consumers) {
+ consumersLock.lock();
+ try {
consumers.remove(key, consumer);
// state changed so inc counter
stateCounter++;
- consumers.notifyAll();
+ consumersCondition.signalAll();
+ } finally {
+ consumersLock.unlock();
}
}
protected KameletConsumer getConsumer(String key, boolean block, long
timeout) throws InterruptedException {
- synchronized (consumers) {
+ consumersLock.lock();
+ try {
KameletConsumer answer = consumers.get(key);
if (answer == null && block) {
StopWatch watch = new StopWatch();
@@ -379,10 +392,12 @@ public class KameletComponent extends DefaultComponent {
if (rem <= 0) {
break;
}
- consumers.wait(rem);
+ consumersCondition.await(rem, TimeUnit.MILLISECONDS);
}
}
return answer;
+ } finally {
+ consumersLock.unlock();
}
}
diff --git
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 2b68cd98efb..7a96ef3e3de 100644
---
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -132,11 +132,16 @@ public class KnativeComponent extends
HealthCheckComponent {
return producerFactory;
}
- public synchronized KnativeProducerFactory getOrCreateProducerFactory()
throws Exception {
- if (producerFactory == null) {
- producerFactory = setUpProducerFactory();
+ public KnativeProducerFactory getOrCreateProducerFactory() throws
Exception {
+ lock.lock();
+ try {
+ if (producerFactory == null) {
+ producerFactory = setUpProducerFactory();
+ }
+ return producerFactory;
+ } finally {
+ lock.unlock();
}
- return producerFactory;
}
/**
@@ -150,11 +155,16 @@ public class KnativeComponent extends
HealthCheckComponent {
return consumerFactory;
}
- public synchronized KnativeConsumerFactory getOrCreateConsumerFactory()
throws Exception {
- if (consumerFactory == null) {
- consumerFactory = setUpConsumerFactory();
+ public KnativeConsumerFactory getOrCreateConsumerFactory() throws
Exception {
+ lock.lock();
+ try {
+ if (consumerFactory == null) {
+ consumerFactory = setUpConsumerFactory();
+ }
+ return consumerFactory;
+ } finally {
+ lock.unlock();
}
- return consumerFactory;
}
/**
diff --git
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 61b2a763001..ded0576fe2b 100644
---
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -126,69 +126,79 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
// Helpers
// **************************************
- private synchronized void onLeadershipTaken() throws Exception {
- if (!isRunAllowed()) {
- return;
- }
+ private void onLeadershipTaken() throws Exception {
+ lock.lock();
+ try {
+ if (!isRunAllowed()) {
+ return;
+ }
- if (delegatedConsumer != null) {
- return;
- }
+ if (delegatedConsumer != null) {
+ return;
+ }
- // start consumer using background task up till X attempts
- long delay = masterEndpoint.getComponent().getBackOffDelay();
- long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
+ // start consumer using background task up till X attempts
+ long delay = masterEndpoint.getComponent().getBackOffDelay();
+ long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
- BackOffTimer timer = new
BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool());
-
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task ->
{
- LOG.info("Leadership taken. Attempt #{} to start consumer: {}",
task.getCurrentAttempts(),
- delegatedEndpoint);
+ BackOffTimer timer = new
BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool());
+
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task ->
{
+ LOG.info("Leadership taken. Attempt #{} to start consumer:
{}", task.getCurrentAttempts(),
+ delegatedEndpoint);
- Exception cause = null;
- try {
- if (delegatedConsumer == null) {
- delegatedConsumer =
delegatedEndpoint.createConsumer(processor);
- if (delegatedConsumer instanceof StartupListener) {
-
getEndpoint().getCamelContext().addStartupListener((StartupListener)
delegatedConsumer);
- }
- if (delegatedConsumer instanceof ResumeAware
resumeAwareConsumer && resumeStrategy != null) {
- LOG.debug("Setting up the resume adapter for the
resume strategy in consumer");
- ResumeAdapter resumeAdapter
- =
AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer,
- resumeStrategy);
- resumeStrategy.setAdapter(resumeAdapter);
-
- LOG.debug("Setting up the resume strategy for
consumer");
- resumeAwareConsumer.setResumeStrategy(resumeStrategy);
+ Exception cause = null;
+ try {
+ if (delegatedConsumer == null) {
+ delegatedConsumer =
delegatedEndpoint.createConsumer(processor);
+ if (delegatedConsumer instanceof StartupListener) {
+
getEndpoint().getCamelContext().addStartupListener((StartupListener)
delegatedConsumer);
+ }
+ if (delegatedConsumer instanceof ResumeAware
resumeAwareConsumer && resumeStrategy != null) {
+ LOG.debug("Setting up the resume adapter for the
resume strategy in consumer");
+ ResumeAdapter resumeAdapter
+ =
AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer,
+ resumeStrategy);
+ resumeStrategy.setAdapter(resumeAdapter);
+
+ LOG.debug("Setting up the resume strategy for
consumer");
+
resumeAwareConsumer.setResumeStrategy(resumeStrategy);
+ }
}
- }
- ServiceHelper.startService(delegatedEndpoint,
delegatedConsumer);
+ ServiceHelper.startService(delegatedEndpoint,
delegatedConsumer);
- } catch (Exception e) {
- cause = e;
- }
+ } catch (Exception e) {
+ cause = e;
+ }
- if (cause != null) {
- String message = "Leadership taken. Attempt #" +
task.getCurrentAttempts()
- + " failed to start consumer due to: " +
cause.getMessage();
- getExceptionHandler().handleException(message, cause);
- return true; // retry
- }
+ if (cause != null) {
+ String message = "Leadership taken. Attempt #" +
task.getCurrentAttempts()
+ + " failed to start consumer due to: " +
cause.getMessage();
+ getExceptionHandler().handleException(message, cause);
+ return true; // retry
+ }
- LOG.info("Leadership taken. Attempt #" + task.getCurrentAttempts()
+ " success. Consumer started: {}",
- delegatedEndpoint);
- return false; // no more attempts
- });
+ LOG.info("Leadership taken. Attempt #" +
task.getCurrentAttempts() + " success. Consumer started: {}",
+ delegatedEndpoint);
+ return false; // no more attempts
+ });
+ } finally {
+ lock.unlock();
+ }
}
- private synchronized void onLeadershipLost() {
- LOG.debug("Leadership lost. Stopping consumer: {}", delegatedEndpoint);
+ private void onLeadershipLost() {
+ lock.lock();
try {
- ServiceHelper.stopAndShutdownServices(delegatedConsumer,
delegatedEndpoint);
+ LOG.debug("Leadership lost. Stopping consumer: {}",
delegatedEndpoint);
+ try {
+ ServiceHelper.stopAndShutdownServices(delegatedConsumer,
delegatedEndpoint);
+ } finally {
+ delegatedConsumer = null;
+ }
+ LOG.info("Leadership lost. Consumer stopped: {}",
delegatedEndpoint);
} finally {
- delegatedConsumer = null;
+ lock.unlock();
}
- LOG.info("Leadership lost. Consumer stopped: {}", delegatedEndpoint);
}
// **************************************
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 802d7b8914b..cf377835847 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -131,216 +131,226 @@ public class MllpTcpClientProducer extends
DefaultProducer implements Runnable {
}
@Override
- public synchronized void process(Exchange exchange) throws MllpException {
- log.trace("process({}) [{}] - entering", exchange.getExchangeId(),
socket);
- getEndpoint().updateLastConnectionActivityTicks();
-
- Message message = exchange.getMessage();
-
- getEndpoint().checkBeforeSendProperties(exchange, socket, log);
-
- // Establish a connection if needed
+ public void process(Exchange exchange) throws MllpException {
+ lock.lock();
try {
- checkConnection();
+ log.trace("process({}) [{}] - entering", exchange.getExchangeId(),
socket);
+ getEndpoint().updateLastConnectionActivityTicks();
- if (cachedLocalAddress != null) {
- message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS,
cachedLocalAddress);
- }
+ Message message = exchange.getMessage();
- if (cachedRemoteAddress != null) {
- message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS,
cachedRemoteAddress);
- }
+ getEndpoint().checkBeforeSendProperties(exchange, socket, log);
- // Send the message to the external system
- byte[] hl7MessageBytes = null;
- Object messageBody = message.getBody();
- if (messageBody == null) {
- String exceptionMessage
- = String.format("process(%s) [%s] - message body is
null", exchange.getExchangeId(), socket);
- exchange.setException(new
MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
- return;
- } else if (messageBody instanceof byte[]) {
- hl7MessageBytes = (byte[]) messageBody;
- } else if (messageBody instanceof String) {
- String stringBody = (String) messageBody;
- hl7MessageBytes =
stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
- if (getConfiguration().hasCharsetName()) {
- exchange.setProperty(ExchangePropertyKey.CHARSET_NAME,
getConfiguration().getCharsetName());
+ // Establish a connection if needed
+ try {
+ checkConnection();
+
+ if (cachedLocalAddress != null) {
+ message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS,
cachedLocalAddress);
}
- }
- log.debug("process({}) [{}] - sending message to external system",
exchange.getExchangeId(), socket);
+ if (cachedRemoteAddress != null) {
+ message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS,
cachedRemoteAddress);
+ }
- try {
- mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
- mllpBuffer.writeTo(socket);
- } catch (MllpSocketException writeEx) {
- // Connection may have been reset - try one more time
- log.debug("process({}) [{}] - exception encountered writing
payload - attempting reconnect",
- exchange.getExchangeId(), socket, writeEx);
- try {
- checkConnection();
- log.trace("process({}) [{}] - reconnected succeeded -
resending payload", exchange.getExchangeId(), socket);
- try {
- mllpBuffer.writeTo(socket);
- } catch (MllpSocketException retryWriteEx) {
- String exceptionMessage = String.format(
- "process(%s) [%s] - exception encountered
attempting to write payload after reconnect",
- exchange.getExchangeId(), socket);
- log.warn(exceptionMessage, retryWriteEx);
- exchange.setException(
- new MllpWriteException(
- exceptionMessage,
mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
+ // Send the message to the external system
+ byte[] hl7MessageBytes = null;
+ Object messageBody = message.getBody();
+ if (messageBody == null) {
+ String exceptionMessage
+ = String.format("process(%s) [%s] - message body
is null", exchange.getExchangeId(), socket);
+ exchange.setException(new
MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
+ return;
+ } else if (messageBody instanceof byte[]) {
+ hl7MessageBytes = (byte[]) messageBody;
+ } else if (messageBody instanceof String) {
+ String stringBody = (String) messageBody;
+ hl7MessageBytes =
stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
+ if (getConfiguration().hasCharsetName()) {
+ exchange.setProperty(ExchangePropertyKey.CHARSET_NAME,
getConfiguration().getCharsetName());
}
- } catch (IOException reconnectEx) {
- String exceptionMessage = String.format("process(%s) [%s]
- exception encountered attempting to reconnect",
- exchange.getExchangeId(), socket);
- log.warn(exceptionMessage, reconnectEx);
- exchange.setException(
- new MllpWriteException(exceptionMessage,
mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
- mllpBuffer.resetSocket(socket);
}
- }
- if (getConfiguration().getExchangePattern() ==
ExchangePattern.InOnly) {
- log.debug("process({}) [{}] - not checking acknowledgement
from external system",
- exchange.getExchangeId(), socket);
- return;
- }
- if (exchange.getException() == null) {
- log.debug("process({}) [{}] - reading acknowledgement from
external system", exchange.getExchangeId(), socket);
+
+ log.debug("process({}) [{}] - sending message to external
system", exchange.getExchangeId(), socket);
+
try {
- mllpBuffer.reset();
- mllpBuffer.readFrom(socket);
- } catch (MllpSocketException receiveAckEx) {
+ mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
+ mllpBuffer.writeTo(socket);
+ } catch (MllpSocketException writeEx) {
// Connection may have been reset - try one more time
- log.debug("process({}) [{}] - exception encountered
reading acknowledgement - attempting reconnect",
- exchange.getExchangeId(), socket, receiveAckEx);
+ log.debug("process({}) [{}] - exception encountered
writing payload - attempting reconnect",
+ exchange.getExchangeId(), socket, writeEx);
try {
checkConnection();
+ log.trace("process({}) [{}] - reconnected succeeded -
resending payload", exchange.getExchangeId(),
+ socket);
+ try {
+ mllpBuffer.writeTo(socket);
+ } catch (MllpSocketException retryWriteEx) {
+ String exceptionMessage = String.format(
+ "process(%s) [%s] - exception encountered
attempting to write payload after reconnect",
+ exchange.getExchangeId(), socket);
+ log.warn(exceptionMessage, retryWriteEx);
+ exchange.setException(
+ new MllpWriteException(
+ exceptionMessage,
mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
+ }
} catch (IOException reconnectEx) {
- String exceptionMessage = String.format(
- "process(%s) [%s] - exception encountered
attempting to reconnect after acknowledgement read failure",
- exchange.getExchangeId(), socket);
+ String exceptionMessage
+ = String.format("process(%s) [%s] - exception
encountered attempting to reconnect",
+ exchange.getExchangeId(), socket);
log.warn(exceptionMessage, reconnectEx);
exchange.setException(
- new MllpAcknowledgementReceiveException(
- exceptionMessage, hl7MessageBytes,
receiveAckEx, logPhi));
+ new MllpWriteException(exceptionMessage,
mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
mllpBuffer.resetSocket(socket);
}
-
- if (exchange.getException() == null) {
- log.trace("process({}) [{}] - resending payload after
successful reconnect", exchange.getExchangeId(),
- socket);
+ }
+ if (getConfiguration().getExchangePattern() ==
ExchangePattern.InOnly) {
+ log.debug("process({}) [{}] - not checking acknowledgement
from external system",
+ exchange.getExchangeId(), socket);
+ return;
+ }
+ if (exchange.getException() == null) {
+ log.debug("process({}) [{}] - reading acknowledgement from
external system", exchange.getExchangeId(),
+ socket);
+ try {
+ mllpBuffer.reset();
+ mllpBuffer.readFrom(socket);
+ } catch (MllpSocketException receiveAckEx) {
+ // Connection may have been reset - try one more time
+ log.debug("process({}) [{}] - exception encountered
reading acknowledgement - attempting reconnect",
+ exchange.getExchangeId(), socket,
receiveAckEx);
try {
- mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
- mllpBuffer.writeTo(socket);
- } catch (MllpSocketException writeRetryEx) {
+ checkConnection();
+ } catch (IOException reconnectEx) {
String exceptionMessage = String.format(
- "process(%s) [%s] - exception encountered
attempting to write payload after read failure and successful reconnect",
+ "process(%s) [%s] - exception encountered
attempting to reconnect after acknowledgement read failure",
exchange.getExchangeId(), socket);
- log.warn(exceptionMessage, writeRetryEx);
+ log.warn(exceptionMessage, reconnectEx);
exchange.setException(
- new MllpWriteException(exceptionMessage,
hl7MessageBytes, receiveAckEx, logPhi));
+ new MllpAcknowledgementReceiveException(
+ exceptionMessage, hl7MessageBytes,
receiveAckEx, logPhi));
+ mllpBuffer.resetSocket(socket);
}
if (exchange.getException() == null) {
- log.trace("process({}) [{}] - resend succeeded -
reading acknowledgement from external system",
- exchange.getExchangeId(), socket);
+ log.trace("process({}) [{}] - resending payload
after successful reconnect",
+ exchange.getExchangeId(),
+ socket);
try {
- mllpBuffer.reset();
- mllpBuffer.readFrom(socket);
- } catch (MllpSocketException secondReceiveEx) {
- String exceptionMessageFormat =
mllpBuffer.isEmpty()
- ? "process(%s) [%s] - exception
encountered reading MLLP Acknowledgement after successful reconnect and resend"
- : "process(%s) [%s] - exception
encountered reading complete MLLP Acknowledgement after successful reconnect
and resend";
- String exceptionMessage
- =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
- log.warn(exceptionMessage, secondReceiveEx);
- // Send the original exception to the exchange
- exchange.setException(new
MllpAcknowledgementReceiveException(
- exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
- logPhi));
- } catch (SocketTimeoutException
secondReadTimeoutEx) {
- String exceptionMessageFormat =
mllpBuffer.isEmpty()
- ? "process(%s) [%s] - timeout
receiving MLLP Acknowledgment after successful reconnect and resend"
- : "process(%s) [%s] - timeout
receiving complete MLLP Acknowledgment after successful reconnect and resend";
- String exceptionMessage
- =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
- log.warn(exceptionMessage,
secondReadTimeoutEx);
- // Send the original exception to the exchange
- exchange.setException(new
MllpAcknowledgementTimeoutException(
- exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
- logPhi));
- mllpBuffer.resetSocket(socket);
+
mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
+ mllpBuffer.writeTo(socket);
+ } catch (MllpSocketException writeRetryEx) {
+ String exceptionMessage = String.format(
+ "process(%s) [%s] - exception
encountered attempting to write payload after read failure and successful
reconnect",
+ exchange.getExchangeId(), socket);
+ log.warn(exceptionMessage, writeRetryEx);
+ exchange.setException(
+ new
MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
+ }
+
+ if (exchange.getException() == null) {
+ log.trace("process({}) [{}] - resend succeeded
- reading acknowledgement from external system",
+ exchange.getExchangeId(), socket);
+ try {
+ mllpBuffer.reset();
+ mllpBuffer.readFrom(socket);
+ } catch (MllpSocketException secondReceiveEx) {
+ String exceptionMessageFormat =
mllpBuffer.isEmpty()
+ ? "process(%s) [%s] - exception
encountered reading MLLP Acknowledgement after successful reconnect and resend"
+ : "process(%s) [%s] - exception
encountered reading complete MLLP Acknowledgement after successful reconnect
and resend";
+ String exceptionMessage
+ =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+ log.warn(exceptionMessage,
secondReceiveEx);
+ // Send the original exception to the
exchange
+ exchange.setException(new
MllpAcknowledgementReceiveException(
+ exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
+ logPhi));
+ } catch (SocketTimeoutException
secondReadTimeoutEx) {
+ String exceptionMessageFormat =
mllpBuffer.isEmpty()
+ ? "process(%s) [%s] - timeout
receiving MLLP Acknowledgment after successful reconnect and resend"
+ : "process(%s) [%s] - timeout
receiving complete MLLP Acknowledgment after successful reconnect and resend";
+ String exceptionMessage
+ =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+ log.warn(exceptionMessage,
secondReadTimeoutEx);
+ // Send the original exception to the
exchange
+ exchange.setException(new
MllpAcknowledgementTimeoutException(
+ exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), receiveAckEx,
+ logPhi));
+ mllpBuffer.resetSocket(socket);
+ }
}
}
+ } catch (SocketTimeoutException timeoutEx) {
+ String exceptionMessageFormat = mllpBuffer.isEmpty()
+ ? "process(%s) [%s] - timeout receiving MLLP
Acknowledgment"
+ : "process(%s) [%s] - timeout receiving
complete MLLP Acknowledgment";
+ String exceptionMessage =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+ log.warn(exceptionMessage, timeoutEx);
+ exchange.setException(new
MllpAcknowledgementTimeoutException(
+ exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
+ mllpBuffer.resetSocket(socket);
}
- } catch (SocketTimeoutException timeoutEx) {
- String exceptionMessageFormat = mllpBuffer.isEmpty()
- ? "process(%s) [%s] - timeout receiving MLLP
Acknowledgment"
- : "process(%s) [%s] - timeout receiving complete
MLLP Acknowledgment";
- String exceptionMessage =
String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
- log.warn(exceptionMessage, timeoutEx);
- exchange.setException(new
MllpAcknowledgementTimeoutException(
- exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
- mllpBuffer.resetSocket(socket);
- }
- if (exchange.getException() == null) {
- if (mllpBuffer.hasCompleteEnvelope()) {
- byte[] acknowledgementBytes =
mllpBuffer.toMllpPayload();
-
- log.debug(
- "process({}) [{}] - populating message headers
with the acknowledgement from the external system",
- exchange.getExchangeId(), socket);
- message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT,
acknowledgementBytes);
- if (acknowledgementBytes != null &&
acknowledgementBytes.length > 0) {
-
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
- acknowledgementBytes,
- MllpCharsetHelper.getCharset(exchange,
acknowledgementBytes, hl7Util, charset)));
- } else {
-
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
- }
+ if (exchange.getException() == null) {
+ if (mllpBuffer.hasCompleteEnvelope()) {
+ byte[] acknowledgementBytes =
mllpBuffer.toMllpPayload();
- if (getConfiguration().isValidatePayload()) {
- String exceptionMessage =
hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
- if (exceptionMessage != null) {
- exchange.setException(new
MllpInvalidAcknowledgementException(
- exceptionMessage, hl7MessageBytes,
acknowledgementBytes, logPhi));
+ log.debug(
+ "process({}) [{}] - populating message
headers with the acknowledgement from the external system",
+ exchange.getExchangeId(), socket);
+
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+ if (acknowledgementBytes != null &&
acknowledgementBytes.length > 0) {
+
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
+ acknowledgementBytes,
+ MllpCharsetHelper.getCharset(exchange,
acknowledgementBytes, hl7Util, charset)));
+ } else {
+
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
}
- }
- if (exchange.getException() == null) {
- log.debug("process({}) [{}] - processing the
acknowledgement from the external system",
- exchange.getExchangeId(), socket);
- try {
-
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
- processAcknowledgment(hl7MessageBytes,
acknowledgementBytes));
- } catch (MllpNegativeAcknowledgementException
nackEx) {
-
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
nackEx.getAcknowledgmentType());
- exchange.setException(nackEx);
+ if (getConfiguration().isValidatePayload()) {
+ String exceptionMessage =
hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+ if (exceptionMessage != null) {
+ exchange.setException(new
MllpInvalidAcknowledgementException(
+ exceptionMessage, hl7MessageBytes,
acknowledgementBytes, logPhi));
+ }
}
- getEndpoint().checkAfterSendProperties(exchange,
socket, log);
+ if (exchange.getException() == null) {
+ log.debug("process({}) [{}] - processing the
acknowledgement from the external system",
+ exchange.getExchangeId(), socket);
+ try {
+
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
+
processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
+ } catch (MllpNegativeAcknowledgementException
nackEx) {
+
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
nackEx.getAcknowledgmentType());
+ exchange.setException(nackEx);
+ }
+
+
getEndpoint().checkAfterSendProperties(exchange, socket, log);
+ }
+ } else {
+ String exceptionMessage =
String.format("process(%s) [%s] - invalid acknowledgement received",
+ exchange.getExchangeId(), socket);
+ exchange.setException(new
MllpInvalidAcknowledgementException(
+ exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), logPhi));
}
- } else {
- String exceptionMessage = String.format("process(%s)
[%s] - invalid acknowledgement received",
- exchange.getExchangeId(), socket);
- exchange.setException(new
MllpInvalidAcknowledgementException(
- exceptionMessage, hl7MessageBytes,
mllpBuffer.toByteArrayAndReset(), logPhi));
}
}
+
+ } catch (IOException ioEx) {
+ log.debug("process({}) [{}] - IOException encountered checking
connection", exchange.getExchangeId(), socket,
+ ioEx);
+ exchange.setException(ioEx);
+ mllpBuffer.resetSocket(socket);
+ } finally {
+ mllpBuffer.reset();
}
- } catch (IOException ioEx) {
- log.debug("process({}) [{}] - IOException encountered checking
connection", exchange.getExchangeId(), socket, ioEx);
- exchange.setException(ioEx);
- mllpBuffer.resetSocket(socket);
+ log.trace("process({}) [{}] - exiting", exchange.getExchangeId(),
socket);
} finally {
- mllpBuffer.reset();
+ lock.unlock();
}
-
- log.trace("process({}) [{}] - exiting", exchange.getExchangeId(),
socket);
}
private String processAcknowledgment(byte[] hl7MessageBytes, byte[]
hl7AcknowledgementBytes) throws MllpException {
@@ -519,44 +529,49 @@ public class MllpTcpClientProducer extends
DefaultProducer implements Runnable {
* Check for idle connection
*/
@Override
- public synchronized void run() {
- if (getConfiguration().hasIdleTimeout()) {
- if (null != socket && !socket.isClosed() && socket.isConnected()) {
- if (getEndpoint().hasLastConnectionActivityTicks()) {
- long idleTime = System.currentTimeMillis() -
getEndpoint().getLastConnectionActivityTicks();
- if (log.isDebugEnabled()) {
- log.debug("Checking {} for idle connection: {} - {}",
getConnectionAddress(), idleTime,
- getConfiguration().getIdleTimeout());
- }
- if (idleTime >= getConfiguration().getIdleTimeout()) {
- if (MllpIdleTimeoutStrategy.CLOSE ==
getConfiguration().getIdleTimeoutStrategy()) {
- log.info(
- "MLLP Connection idle time of '{}'
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds -
closing connection",
- idleTime,
getConfiguration().getIdleTimeout());
- mllpBuffer.closeSocket(socket);
+ public void run() {
+ lock.lock();
+ try {
+ if (getConfiguration().hasIdleTimeout()) {
+ if (null != socket && !socket.isClosed() &&
socket.isConnected()) {
+ if (getEndpoint().hasLastConnectionActivityTicks()) {
+ long idleTime = System.currentTimeMillis() -
getEndpoint().getLastConnectionActivityTicks();
+ if (log.isDebugEnabled()) {
+ log.debug("Checking {} for idle connection: {} -
{}", getConnectionAddress(), idleTime,
+ getConfiguration().getIdleTimeout());
+ }
+ if (idleTime >= getConfiguration().getIdleTimeout()) {
+ if (MllpIdleTimeoutStrategy.CLOSE ==
getConfiguration().getIdleTimeoutStrategy()) {
+ log.info(
+ "MLLP Connection idle time of '{}'
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds -
closing connection",
+ idleTime,
getConfiguration().getIdleTimeout());
+ mllpBuffer.closeSocket(socket);
+ } else {
+ log.info(
+ "MLLP Connection idle time of '{}'
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds -
resetting connection",
+ idleTime,
getConfiguration().getIdleTimeout());
+ mllpBuffer.resetSocket(socket);
+ }
} else {
- log.info(
- "MLLP Connection idle time of '{}'
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds -
resetting connection",
- idleTime,
getConfiguration().getIdleTimeout());
- mllpBuffer.resetSocket(socket);
+ long minDelay = 100;
+ long delay = Long.min(Long.max(minDelay,
getConfiguration().getIdleTimeout() - idleTime),
+ getConfiguration().getIdleTimeout());
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling idle producer connection
check of {} in {} milliseconds",
+ getConnectionAddress(), delay);
+ }
+ idleTimeoutExecutor.schedule(this, delay,
TimeUnit.MILLISECONDS);
}
} else {
- long minDelay = 100;
- long delay = Long.min(Long.max(minDelay,
getConfiguration().getIdleTimeout() - idleTime),
+ log.debug(
+ "No activity detected since initial connection
- scheduling idle producer connection check in {} milliseconds",
getConfiguration().getIdleTimeout());
- if (log.isDebugEnabled()) {
- log.debug("Scheduling idle producer connection
check of {} in {} milliseconds",
- getConnectionAddress(), delay);
- }
- idleTimeoutExecutor.schedule(this, delay,
TimeUnit.MILLISECONDS);
+ idleTimeoutExecutor.schedule(this,
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
}
- } else {
- log.debug(
- "No activity detected since initial connection -
scheduling idle producer connection check in {} milliseconds",
- getConfiguration().getIdleTimeout());
- idleTimeoutExecutor.schedule(this,
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
index d4675fc586b..653d32b03ea 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.mongodb;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndUpdateOptions;
@@ -36,6 +39,7 @@ public class MongoDbTailTrackingManager {
private final MongoClient connection;
private final MongoDbTailTrackingConfig config;
+ private final Lock lock = new ReentrantLock();
private MongoCollection<Document> dbCol;
private Document trackingObj;
@@ -60,32 +64,42 @@ public class MongoDbTailTrackingManager {
trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
}
- public synchronized void persistToStore() {
- if (!config.persistent || lastVal == null) {
- return;
+ public void persistToStore() {
+ lock.lock();
+ try {
+ if (!config.persistent || lastVal == null) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Persisting lastVal={} to store, collection: {}",
lastVal, config.collection);
+ }
+
+ Bson updateObj = Updates.set(config.field, lastVal);
+ FindOneAndUpdateOptions options = new
FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
+ trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj,
options);
+ } finally {
+ lock.unlock();
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Persisting lastVal={} to store, collection: {}",
lastVal, config.collection);
- }
-
- Bson updateObj = Updates.set(config.field, lastVal);
- FindOneAndUpdateOptions options = new
FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
- trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, options);
}
- public synchronized Object recoverFromStore() {
- if (!config.persistent) {
- return null;
- }
+ public Object recoverFromStore() {
+ lock.lock();
+ try {
+ if (!config.persistent) {
+ return null;
+ }
- lastVal = dbCol.find(trackingObj).first().get(config.field);
+ lastVal = dbCol.find(trackingObj).first().get(config.field);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recovered lastVal={} from store, collection: {}",
lastVal, config.collection);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered lastVal={} from store, collection: {}",
lastVal, config.collection);
+ }
- return lastVal;
+ return lastVal;
+ } finally {
+ lock.unlock();
+ }
}
public void setLastVal(Document dbObj) {
diff --git
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
index e8112e436a4..e0c1e976c09 100644
---
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
+++
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
@@ -45,17 +45,22 @@ public class MyBatisComponent extends HealthCheckComponent {
return answer;
}
- protected synchronized SqlSessionFactory createSqlSessionFactory() throws
IOException {
- if (sqlSessionFactory == null) {
- ObjectHelper.notNull(configurationUri, "configurationUri", this);
- InputStream is =
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(),
configurationUri);
- try {
- sqlSessionFactory = new SqlSessionFactoryBuilder().build(is);
- } finally {
- IOHelper.close(is);
+ protected SqlSessionFactory createSqlSessionFactory() throws IOException {
+ lock.lock();
+ try {
+ if (sqlSessionFactory == null) {
+ ObjectHelper.notNull(configurationUri, "configurationUri",
this);
+ InputStream is =
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(),
configurationUri);
+ try {
+ sqlSessionFactory = new
SqlSessionFactoryBuilder().build(is);
+ } finally {
+ IOHelper.close(is);
+ }
}
+ return sqlSessionFactory;
+ } finally {
+ lock.unlock();
}
- return sqlSessionFactory;
}
public SqlSessionFactory getSqlSessionFactory() {