This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-20199/remove-synchronized-blocks-from-j2m-components in repository https://gitbox.apache.org/repos/asf/camel.git
commit e954831d6f8e485890638b6803d67105701362ee Author: Nicolas Filotto <nicolas.filo...@qlik.com> AuthorDate: Thu Oct 31 15:23:55 2024 +0100 CAMEL-20199: Remove synchronized block from components J to M --- .../camel/component/milvus/MilvusEndpoint.java | 11 +- .../component/github/consumer/CommitConsumer.java | 16 +- .../jackson/converter/JacksonTypeConverters.java | 11 +- .../component/jasypt/JasyptPropertiesParser.java | 49 +-- .../converter/jaxb/FallbackTypeConverter.java | 28 +- .../camel/component/jcache/JCacheManager.java | 45 ++- .../apache/camel/component/jcr/JcrConsumer.java | 132 ++++--- .../camel/component/jetty/JettyHttpComponent.java | 259 +++++++------ .../apache/camel/component/jira/JiraEndpoint.java | 76 ++-- .../component/jms/EndpointMessageListener.java | 17 +- .../apache/camel/component/jms/JmsComponent.java | 19 +- .../apache/camel/component/jms/JmsProducer.java | 5 +- .../component/jms/JmsTemporaryQueueEndpoint.java | 13 +- .../component/jms/JmsTemporaryTopicEndpoint.java | 13 +- .../component/jms/StreamMessageInputStream.java | 8 +- .../jms/reply/MessageSelectorCreator.java | 14 +- .../component/jms/reply/QueueReplyManager.java | 5 +- .../jmx/JMXConsumerNotificationFilter.java | 43 ++- .../apache/camel/component/jolt/JoltEndpoint.java | 63 ++-- .../apache/camel/component/jpa/JpaComponent.java | 17 +- .../apache/camel/component/jslt/JsltEndpoint.java | 92 ++--- .../jsonvalidator/JsonValidatorEndpoint.java | 5 +- .../java/org/apache/camel/jsonpath/JsonStream.java | 2 +- .../camel/component/jt400/Jt400Component.java | 15 +- .../component/jt400/Jt400MsgQueueConsumer.java | 73 ++-- .../jta/JtaTransactionErrorHandlerReifier.java | 43 ++- .../camel/component/kafka/KafkaFetchRecords.java | 2 +- .../camel/component/kamelet/KameletComponent.java | 27 +- .../camel/component/knative/KnativeComponent.java | 26 +- .../camel/component/master/MasterConsumer.java | 110 +++--- .../component/mllp/MllpTcpClientProducer.java | 413 +++++++++++---------- .../mongodb/MongoDbTailTrackingManager.java | 54 ++- .../camel/component/mybatis/MyBatisComponent.java | 23 +- 33 files changed, 982 insertions(+), 747 deletions(-) diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java index 0957ab82adb..c1f8146f63a 100644 --- a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusEndpoint.java @@ -56,8 +56,6 @@ public class MilvusEndpoint extends DefaultEndpoint implements EndpointServiceLo @UriParam private MilvusConfiguration configuration; - private final Object lock; - private volatile boolean closeClient; private volatile MilvusClient client; @@ -71,8 +69,6 @@ public class MilvusEndpoint extends DefaultEndpoint implements EndpointServiceLo this.collection = collection; this.configuration = configuration; - - this.lock = new Object(); } @Override @@ -93,9 +89,10 @@ public class MilvusEndpoint extends DefaultEndpoint implements EndpointServiceLo return collection; } - public synchronized MilvusClient getClient() { + public MilvusClient getClient() { if (this.client == null) { - synchronized (this.lock) { + lock.lock(); + try { if (this.client == null) { this.client = this.configuration.getClient(); this.closeClient = false; @@ -105,6 +102,8 @@ public class MilvusEndpoint extends DefaultEndpoint implements EndpointServiceLo this.closeClient = true; } } + } finally { + lock.unlock(); } } diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java index 03bda8756e3..ec03b96ee36 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java @@ -75,7 +75,8 @@ public class CommitConsumer extends AbstractGitHubConsumer { @Override protected void doStart() throws Exception { - synchronized (this) { + lock.lock(); + try { super.doStart(); // ensure we start from clean @@ -97,24 +98,29 @@ public class CommitConsumer extends AbstractGitHubConsumer { LOG.info("Starting from beginning"); } started = true; + } finally { + lock.unlock(); } } @Override protected void doStop() throws Exception { - synchronized (this) { + lock.lock(); + try { super.doStop(); commitHashes.clear(); lastSha = null; started = false; + } finally { + lock.unlock(); } } @Override protected int poll() throws Exception { - synchronized (this) { - + lock.lock(); + try { if (!started) { return 0; } @@ -169,6 +175,8 @@ public class CommitConsumer extends AbstractGitHubConsumer { } LOG.debug("Last sha: {}", lastSha); return counter; + } finally { + lock.unlock(); } } diff --git a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java index 4bd45abb1d1..3e419ecacc0 100644 --- a/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java +++ b/components/camel-jackson/src/main/java/org/apache/camel/component/jackson/converter/JacksonTypeConverters.java @@ -25,6 +25,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory; public final class JacksonTypeConverters { private static final Logger LOG = LoggerFactory.getLogger(JacksonTypeConverters.class); - private final Object lock; + private final Lock lock; private volatile ObjectMapper defaultMapper; private boolean init; @@ -66,7 +68,7 @@ public final class JacksonTypeConverters { private String moduleClassNames; public JacksonTypeConverters() { - this.lock = new Object(); + this.lock = new ReentrantLock(); } @Converter @@ -306,7 +308,8 @@ public final class JacksonTypeConverters { } if (defaultMapper == null) { - synchronized (lock) { + lock.lock(); + try { if (defaultMapper == null) { ObjectMapper mapper = new ObjectMapper(); if (moduleClassNames != null) { @@ -322,6 +325,8 @@ public final class JacksonTypeConverters { defaultMapper = mapper; } + } finally { + lock.unlock(); } } diff --git a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java index 0dc7ddbec66..b95b9300822 100644 --- a/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java +++ b/components/camel-jasypt/src/main/java/org/apache/camel/component/jasypt/JasyptPropertiesParser.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.jasypt; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -43,15 +45,13 @@ public class JasyptPropertiesParser extends DefaultPropertiesParser { = JASYPT_PREFIX_TOKEN.replace("(", "\\(") + "(.+?)" + JASYPT_SUFFIX_TOKEN.replace(")", "\\)"); private static final Pattern PATTERN = Pattern.compile(JASYPT_REGEX); + private final Lock lock = new ReentrantLock(); private StringEncryptor encryptor; private String password; private String algorithm; private String randomSaltGeneratorAlgorithm; private String randomIvGeneratorAlgorithm; - public JasyptPropertiesParser() { - } - @Override public String parseProperty(String key, String value, PropertiesLookup properties) { log.trace("Parsing property '{}={}'", key, value); @@ -69,27 +69,32 @@ public class JasyptPropertiesParser extends DefaultPropertiesParser { return value; } - private synchronized void initEncryptor() { - if (encryptor == null) { - StringHelper.notEmpty("password", password); - StandardPBEStringEncryptor pbeStringEncryptor = new StandardPBEStringEncryptor(); - - pbeStringEncryptor.setPassword(password); - if (algorithm != null) { - pbeStringEncryptor.setAlgorithm(algorithm); - log.debug("Initialized encryptor using {} algorithm and provided password", algorithm); - } else { - log.debug("Initialized encryptor using default algorithm and provided password"); - } + private void initEncryptor() { + lock.lock(); + try { + if (encryptor == null) { + StringHelper.notEmpty("password", password); + StandardPBEStringEncryptor pbeStringEncryptor = new StandardPBEStringEncryptor(); + + pbeStringEncryptor.setPassword(password); + if (algorithm != null) { + pbeStringEncryptor.setAlgorithm(algorithm); + log.debug("Initialized encryptor using {} algorithm and provided password", algorithm); + } else { + log.debug("Initialized encryptor using default algorithm and provided password"); + } - if (randomSaltGeneratorAlgorithm != null) { - pbeStringEncryptor.setSaltGenerator(new RandomSaltGenerator(randomSaltGeneratorAlgorithm)); - } - if (randomIvGeneratorAlgorithm != null) { - pbeStringEncryptor.setIvGenerator(new RandomIvGenerator(randomIvGeneratorAlgorithm)); - } + if (randomSaltGeneratorAlgorithm != null) { + pbeStringEncryptor.setSaltGenerator(new RandomSaltGenerator(randomSaltGeneratorAlgorithm)); + } + if (randomIvGeneratorAlgorithm != null) { + pbeStringEncryptor.setIvGenerator(new RandomIvGenerator(randomIvGeneratorAlgorithm)); + } - encryptor = pbeStringEncryptor; + encryptor = pbeStringEncryptor; + } + } finally { + lock.unlock(); } } diff --git a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java index e6dba9c4e90..1b8622e701d 100644 --- a/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java +++ b/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/FallbackTypeConverter.java @@ -28,6 +28,8 @@ import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.xml.bind.JAXBContext; import jakarta.xml.bind.JAXBElement; @@ -63,6 +65,7 @@ public class FallbackTypeConverter { private static final Logger LOG = LoggerFactory.getLogger(FallbackTypeConverter.class); + private final Lock lock = new ReentrantLock(); private final Map<AnnotatedElement, JAXBContext> contexts = new HashMap<>(); private final StaxConverter staxConverter = new StaxConverter(); private boolean defaultPrettyPrint = true; @@ -318,19 +321,24 @@ public class FallbackTypeConverter { return exchange != null && exchange.getProperty(Exchange.FILTER_NON_XML_CHARS, Boolean.FALSE, Boolean.class); } - protected synchronized <T> JAXBContext createContext(Class<T> type) throws JAXBException { + protected <T> JAXBContext createContext(Class<T> type) throws JAXBException { AnnotatedElement ae = hasXmlRootElement(type) ? type : type.getPackage(); - JAXBContext context = contexts.get(ae); - if (context == null) { - if (hasXmlRootElement(type)) { - context = JAXBContext.newInstance(type); - contexts.put(type, context); - } else { - context = JAXBContext.newInstance(type.getPackage().getName()); - contexts.put(type.getPackage(), context); + lock.lock(); + try { + JAXBContext context = contexts.get(ae); + if (context == null) { + if (hasXmlRootElement(type)) { + context = JAXBContext.newInstance(type); + contexts.put(type, context); + } else { + context = JAXBContext.newInstance(type.getPackage().getName()); + contexts.put(type.getPackage(), context); + } } + return context; + } finally { + lock.unlock(); } - return context; } protected <T> Unmarshaller getUnmarshaller(Class<T> type) throws JAXBException { diff --git a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java index 0b490cdcbbc..ebde348720a 100644 --- a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java +++ b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheManager.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; import javax.cache.CacheManager; @@ -38,6 +40,7 @@ public class JCacheManager<K, V> implements Closeable { private final JCacheConfiguration configuration; private final String cacheName; private final CamelContext camelContext; + private final Lock lock = new ReentrantLock(); private CachingProvider provider; private CacheManager manager; private Cache<K, V> cache; @@ -68,29 +71,39 @@ public class JCacheManager<K, V> implements Closeable { return this.configuration; } - public synchronized Cache<K, V> getCache() throws Exception { - if (cache == null) { - JCacheProvider provider = JCacheProviders.lookup(configuration.getCachingProvider()); - cache = doGetCache(provider); - } + public Cache<K, V> getCache() throws Exception { + lock.lock(); + try { + if (cache == null) { + JCacheProvider provider = JCacheProviders.lookup(configuration.getCachingProvider()); + cache = doGetCache(provider); + } - return cache; + return cache; + } finally { + lock.unlock(); + } } @Override - public synchronized void close() throws IOException { - if (configuration != null) { - if (cache != null) { - cache.close(); - } + public void close() throws IOException { + lock.lock(); + try { + if (configuration != null) { + if (cache != null) { + cache.close(); + } - if (manager != null) { - manager.close(); - } + if (manager != null) { + manager.close(); + } - if (provider != null) { - provider.close(); + if (provider != null) { + provider.close(); + } } + } finally { + lock.unlock(); } } diff --git a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java index 2723004465e..63d628337de 100644 --- a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java +++ b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java @@ -63,86 +63,97 @@ public class JcrConsumer extends DefaultConsumer { return (JcrEndpoint) getEndpoint(); } - private synchronized void createSessionAndRegisterListener() throws RepositoryException { - LOG.trace("createSessionAndRegisterListener START"); - - if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) { - session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials()); - } else { - session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(), - getJcrEndpoint().getWorkspaceName()); - } + private void createSessionAndRegisterListener() throws RepositoryException { + lock.lock(); + try { + LOG.trace("createSessionAndRegisterListener START"); + + if (ObjectHelper.isEmpty(getJcrEndpoint().getWorkspaceName())) { + session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials()); + } else { + session = getJcrEndpoint().getRepository().login(getJcrEndpoint().getCredentials(), + getJcrEndpoint().getWorkspaceName()); + } - int eventTypes = getJcrEndpoint().getEventTypes(); - String absPath = getJcrEndpoint().getBase(); + int eventTypes = getJcrEndpoint().getEventTypes(); + String absPath = getJcrEndpoint().getBase(); - if (absPath == null) { - absPath = "/"; - } else if (!absPath.startsWith("/")) { - absPath = "/" + absPath; - } + if (absPath == null) { + absPath = "/"; + } else if (!absPath.startsWith("/")) { + absPath = "/" + absPath; + } - boolean isDeep = getJcrEndpoint().isDeep(); - String[] uuid = null; - String uuids = getJcrEndpoint().getUuids(); + boolean isDeep = getJcrEndpoint().isDeep(); + String[] uuid = null; + String uuids = getJcrEndpoint().getUuids(); - if (uuids != null) { - uuids = uuids.trim(); + if (uuids != null) { + uuids = uuids.trim(); - if (!uuids.isEmpty()) { - uuid = uuids.split(","); + if (!uuids.isEmpty()) { + uuid = uuids.split(","); + } } - } - String[] nodeTypeName = null; - String nodeTypeNames = getJcrEndpoint().getNodeTypeNames(); + String[] nodeTypeName = null; + String nodeTypeNames = getJcrEndpoint().getNodeTypeNames(); - if (nodeTypeNames != null) { - nodeTypeNames = nodeTypeNames.trim(); + if (nodeTypeNames != null) { + nodeTypeNames = nodeTypeNames.trim(); - if (!nodeTypeNames.isEmpty()) { - nodeTypeName = nodeTypeNames.split(","); + if (!nodeTypeNames.isEmpty()) { + nodeTypeName = nodeTypeNames.split(","); + } } - } - boolean noLocal = getJcrEndpoint().isNoLocal(); + boolean noLocal = getJcrEndpoint().isNoLocal(); - eventListener = new EndpointEventListener(this, getJcrEndpoint(), getProcessor()); + eventListener = new EndpointEventListener(this, getJcrEndpoint(), getProcessor()); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={}, isDeep={}, uuid={}, nodeTypeName={}, noLocal={}", - eventListener, absPath, eventTypes, isDeep, Arrays.toString(uuid), Arrays.toString(nodeTypeName), - noLocal); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Adding JCR Event Listener, {}, on {}. eventTypes={}, isDeep={}, uuid={}, nodeTypeName={}, noLocal={}", + eventListener, absPath, eventTypes, isDeep, Arrays.toString(uuid), Arrays.toString(nodeTypeName), + noLocal); + } - session.getWorkspace().getObservationManager() - .addEventListener(eventListener, eventTypes, absPath, isDeep, uuid, nodeTypeName, noLocal); + session.getWorkspace().getObservationManager() + .addEventListener(eventListener, eventTypes, absPath, isDeep, uuid, nodeTypeName, noLocal); - LOG.trace("createSessionAndRegisterListener END"); + LOG.trace("createSessionAndRegisterListener END"); + } finally { + lock.unlock(); + } } - private synchronized void unregisterListenerAndLogoutSession() throws RepositoryException { - LOG.trace("unregisterListenerAndLogoutSession START"); + private void unregisterListenerAndLogoutSession() throws RepositoryException { + lock.lock(); + try { + LOG.trace("unregisterListenerAndLogoutSession START"); - if (session != null) { - try { - if (!session.isLive()) { - LOG.info("Session was is no more live."); - } else { - if (eventListener != null) { - session.getWorkspace().getObservationManager().removeEventListener(eventListener); - eventListener = null; + if (session != null) { + try { + if (!session.isLive()) { + LOG.info("Session was is no more live."); + } else { + if (eventListener != null) { + session.getWorkspace().getObservationManager().removeEventListener(eventListener); + eventListener = null; + } + + session.logout(); } - - session.logout(); + } finally { + eventListener = null; + session = null; } - } finally { - eventListener = null; - session = null; } - } - LOG.trace("unregisterListenerAndLogoutSession END"); + LOG.trace("unregisterListenerAndLogoutSession END"); + } finally { + lock.unlock(); + } } private void cancelSessionListenerChecker() { @@ -170,7 +181,8 @@ public class JcrConsumer extends DefaultConsumer { boolean isSessionLive = false; - synchronized (this) { + lock.lock(); + try { if (JcrConsumer.this.session != null) { try { isSessionLive = JcrConsumer.this.session.isLive(); @@ -178,6 +190,8 @@ public class JcrConsumer extends DefaultConsumer { LOG.debug("Exception while checking jcr session", e); } } + } finally { + lock.unlock(); } if (!isSessionLive) { diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java index 4bf8293d7db..baa13924e82 100644 --- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java +++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java @@ -28,6 +28,7 @@ import java.util.EventListener; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import jakarta.servlet.Filter; import jakarta.servlet.MultipartConfigElement; @@ -90,7 +91,7 @@ public abstract class JettyHttpComponent extends HttpCommonComponent implements RestConsumerFactory, RestApiConsumerFactory, SSLContextParametersAware { public static final String TMP_DIR = "CamelJettyTempDir"; - protected static final HashMap<String, ConnectorRef> CONNECTORS = new HashMap<>(); + protected static final Map<String, ConnectorRef> CONNECTORS = new ConcurrentHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(JettyHttpComponent.class); private static final String JETTY_SSL_KEYSTORE = "org.eclipse.jetty.ssl.keystore"; @@ -282,20 +283,18 @@ public abstract class JettyHttpComponent extends HttpCommonComponent JettyHttpEndpoint endpoint = (JettyHttpEndpoint) consumer.getEndpoint(); String connectorKey = getConnectorKey(endpoint); - synchronized (CONNECTORS) { - ConnectorRef connectorRef = CONNECTORS.get(connectorKey); - - // check if there are already another consumer on the same context-path and if so fail - if (connectorRef != null) { - for (Map.Entry<String, HttpConsumer> entry : connectorRef.servlet.getConsumers().entrySet()) { - String path = entry.getValue().getPath(); - CamelContext camelContext = entry.getValue().getEndpoint().getCamelContext(); - if (consumer.getPath().equals(path)) { - // its allowed if they are from the same camel context - boolean sameContext = consumer.getEndpoint().getCamelContext() == camelContext; - if (!sameContext) { - return false; - } + ConnectorRef connectorRef = CONNECTORS.get(connectorKey); + + // check if there are already another consumer on the same context-path and if so fail + if (connectorRef != null) { + for (Map.Entry<String, HttpConsumer> entry : connectorRef.servlet.getConsumers().entrySet()) { + String path = entry.getValue().getPath(); + CamelContext camelContext = entry.getValue().getEndpoint().getCamelContext(); + if (consumer.getPath().equals(path)) { + // its allowed if they are from the same camel context + boolean sameContext = consumer.getEndpoint().getCamelContext() == camelContext; + if (!sameContext) { + return false; } } } @@ -313,68 +312,77 @@ public abstract class JettyHttpComponent extends HttpCommonComponent JettyHttpEndpoint endpoint = (JettyHttpEndpoint) consumer.getEndpoint(); String connectorKey = getConnectorKey(endpoint); - synchronized (CONNECTORS) { - ConnectorRef connectorRef = CONNECTORS.get(connectorKey); - if (connectorRef == null) { - Server server = createServer(); - Connector connector = getConnector(server, endpoint); - if ("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost())) { - LOG.warn("You use localhost interface! It means that no external connections will be available. " - + "Don't you want to use 0.0.0.0 instead (all network interfaces)? {}", - endpoint); - } - if (endpoint.isEnableJmx()) { - enableJmx(server); - } - server.addConnector(connector); - - connectorRef = new ConnectorRef( - server, connector, - createServletForConnector(server, connector, endpoint.getHandlers(), endpoint)); - // must enable session before we start - if (endpoint.isSessionSupport()) { - enableSessionSupport(connectorRef.server, connectorKey); - } - connectorRef.server.start(); + CONNECTORS.compute(connectorKey, (cKey, connectorRef) -> { + try { + return connect(consumer, endpoint, cKey, connectorRef); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + }); + } - LOG.debug("Adding connector key: {} -> {}", connectorKey, connectorRef); - CONNECTORS.put(connectorKey, connectorRef); + private ConnectorRef connect( + HttpConsumer consumer, JettyHttpEndpoint endpoint, String connectorKey, ConnectorRef connectorRef) + throws Exception { + if (connectorRef == null) { + Server server = createServer(); + Connector connector = getConnector(server, endpoint); + if ("localhost".equalsIgnoreCase(endpoint.getHttpUri().getHost())) { + LOG.warn("You use localhost interface! It means that no external connections will be available. " + + "Don't you want to use 0.0.0.0 instead (all network interfaces)? {}", + endpoint); + } + if (endpoint.isEnableJmx()) { + enableJmx(server); + } + server.addConnector(connector); + + connectorRef = new ConnectorRef( + server, connector, + createServletForConnector(server, connector, endpoint.getHandlers(), endpoint)); + // must enable session before we start + if (endpoint.isSessionSupport()) { + enableSessionSupport(connectorRef.server, connectorKey); + } + connectorRef.server.start(); - } else { - LOG.debug("Using existing connector key: {} -> {}", connectorKey, connectorRef); + LOG.debug("Adding connector key: {} -> {}", connectorKey, connectorRef); + } else { + LOG.debug("Using existing connector key: {} -> {}", connectorKey, connectorRef); - // check if there are any new handlers, and if so then we need to re-start the server - if (endpoint.getHandlers() != null && !endpoint.getHandlers().isEmpty()) { - List<Handler> existingHandlers = new ArrayList<>(); - if (connectorRef.server.getHandlers() != null && !connectorRef.server.getHandlers().isEmpty()) { - existingHandlers = connectorRef.server.getHandlers(); - } - List<Handler> newHandlers = new ArrayList<>(endpoint.getHandlers()); - boolean changed = !existingHandlers.containsAll(newHandlers) && !newHandlers.containsAll(existingHandlers); - if (changed) { - LOG.debug("Restarting Jetty server due to adding new Jetty Handlers: {}", newHandlers); - connectorRef.server.stop(); - addJettyHandlers(connectorRef.server, endpoint.getHandlers()); - connectorRef.server.start(); - } + // check if there are any new handlers, and if so then we need to re-start the server + if (endpoint.getHandlers() != null && !endpoint.getHandlers().isEmpty()) { + List<Handler> existingHandlers = new ArrayList<>(); + if (connectorRef.server.getHandlers() != null && !connectorRef.server.getHandlers().isEmpty()) { + existingHandlers = connectorRef.server.getHandlers(); } - // check the session support - if (endpoint.isSessionSupport()) { - enableSessionSupport(connectorRef.server, connectorKey); + List<Handler> newHandlers = new ArrayList<>(endpoint.getHandlers()); + boolean changed = !existingHandlers.containsAll(newHandlers) && !newHandlers.containsAll(existingHandlers); + if (changed) { + LOG.debug("Restarting Jetty server due to adding new Jetty Handlers: {}", newHandlers); + connectorRef.server.stop(); + addJettyHandlers(connectorRef.server, endpoint.getHandlers()); + connectorRef.server.start(); } - // ref track the connector - connectorRef.increment(); } - - if (endpoint.isEnableMultipartFilter()) { - enableMultipartFilter(endpoint, connectorRef.server); + // check the session support + if (endpoint.isSessionSupport()) { + enableSessionSupport(connectorRef.server, connectorKey); } + // ref track the connector + connectorRef.increment(); + } - if (endpoint.getFilters() != null && !endpoint.getFilters().isEmpty()) { - setFilters(endpoint, connectorRef.server); - } - connectorRef.servlet.connect(consumer); + if (endpoint.isEnableMultipartFilter()) { + enableMultipartFilter(endpoint, connectorRef.server); } + + if (endpoint.getFilters() != null && !endpoint.getFilters().isEmpty()) { + setFilters(endpoint, connectorRef.server); + } + connectorRef.servlet.connect(consumer); + + return connectorRef; } private void enableJmx(Server server) { @@ -388,7 +396,7 @@ public abstract class JettyHttpComponent extends HttpCommonComponent } } - private void enableSessionSupport(Server server, String connectorKey) throws Exception { + private void enableSessionSupport(Server server, String connectorKey) { ServletContextHandler context = server.getDescendant(ServletContextHandler.class); if (context.getSessionHandler() == null) { SessionHandler sessionHandler = new SessionHandler(); @@ -469,33 +477,39 @@ public abstract class JettyHttpComponent extends HttpCommonComponent HttpCommonEndpoint endpoint = consumer.getEndpoint(); String connectorKey = getConnectorKey(endpoint); - synchronized (CONNECTORS) { - ConnectorRef connectorRef = CONNECTORS.get(connectorKey); - if (connectorRef != null) { - connectorRef.servlet.disconnect(consumer); - if (connectorRef.decrement() == 0) { - connectorRef.server.removeConnector(connectorRef.connector); - connectorRef.connector.stop(); - connectorRef.server.stop(); - CONNECTORS.remove(connectorKey); - // Camel controls the lifecycle of these entities so remove the - // registered MBeans when Camel is done with the managed objects. - if (mbContainer != null) { - this.removeServerMBean(connectorRef.server); - //mbContainer.removeBean(connectorRef.connector); - } - if (defaultQueuedThreadPool != null) { - try { - defaultQueuedThreadPool.stop(); - } catch (Exception t) { - defaultQueuedThreadPool.destroy(); - } finally { - defaultQueuedThreadPool = null; - } - } + CONNECTORS.computeIfPresent(connectorKey, (cKey, connectorRef) -> { + try { + return disconnect(consumer, connectorRef); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + }); + } + + private ConnectorRef disconnect(HttpConsumer consumer, ConnectorRef connectorRef) throws Exception { + connectorRef.servlet.disconnect(consumer); + if (connectorRef.decrement() == 0) { + connectorRef.server.removeConnector(connectorRef.connector); + connectorRef.connector.stop(); + connectorRef.server.stop(); + // Camel controls the lifecycle of these entities so remove the + // registered MBeans when Camel is done with the managed objects. + if (mbContainer != null) { + this.removeServerMBean(connectorRef.server); + //mbContainer.removeBean(connectorRef.connector); + } + if (defaultQueuedThreadPool != null) { + try { + defaultQueuedThreadPool.stop(); + } catch (Exception t) { + defaultQueuedThreadPool.destroy(); + } finally { + defaultQueuedThreadPool = null; } } + return null; } + return connectorRef; } private String getConnectorKey(HttpCommonEndpoint endpoint) { @@ -808,25 +822,30 @@ public abstract class JettyHttpComponent extends HttpCommonComponent throw new IllegalArgumentException("Jetty component does not use HttpConfiguration."); } - public synchronized MBeanContainer getMbContainer() { - // If null, provide the default implementation. - if (mbContainer == null) { - MBeanServer mbs = null; + public MBeanContainer getMbContainer() { + lock.lock(); + try { + // If null, provide the default implementation. + if (mbContainer == null) { + MBeanServer mbs = null; + + final ManagementStrategy mStrategy = this.getCamelContext().getManagementStrategy(); + final ManagementAgent mAgent = mStrategy.getManagementAgent(); + if (mAgent != null) { + mbs = mAgent.getMBeanServer(); + } - final ManagementStrategy mStrategy = this.getCamelContext().getManagementStrategy(); - final ManagementAgent mAgent = mStrategy.getManagementAgent(); - if (mAgent != null) { - mbs = mAgent.getMBeanServer(); + if (mbs != null) { + mbContainer = new MBeanContainer(mbs); + } else { + LOG.warn("JMX disabled in CamelContext. Jetty JMX extensions will remain disabled."); + } } - if (mbs != null) { - mbContainer = new MBeanContainer(mbs); - } else { - LOG.warn("JMX disabled in CamelContext. Jetty JMX extensions will remain disabled."); - } + return this.mbContainer; + } finally { + lock.unlock(); } - - return this.mbContainer; } /** @@ -1344,19 +1363,17 @@ public abstract class JettyHttpComponent extends HttpCommonComponent @Override protected void doStop() throws Exception { super.doStop(); - if (!CONNECTORS.isEmpty()) { - for (Map.Entry<String, ConnectorRef> connectorEntry : CONNECTORS.entrySet()) { - ConnectorRef connectorRef = connectorEntry.getValue(); - if (connectorRef != null && connectorRef.getRefCount() == 0) { - connectorRef.server.removeConnector(connectorRef.connector); - connectorRef.connector.stop(); - // Camel controls the lifecycle of these entities so remove the - // registered MBeans when Camel is done with the managed objects. - removeServerMBean(connectorRef.server); - connectorRef.server.stop(); - //removeServerMBean(connectorRef.connector); - CONNECTORS.remove(connectorEntry.getKey()); - } + for (Map.Entry<String, ConnectorRef> connectorEntry : CONNECTORS.entrySet()) { + ConnectorRef connectorRef = connectorEntry.getValue(); + if (connectorRef != null && connectorRef.getRefCount() == 0) { + connectorRef.server.removeConnector(connectorRef.connector); + connectorRef.connector.stop(); + // Camel controls the lifecycle of these entities so remove the + // registered MBeans when Camel is done with the managed objects. + removeServerMBean(connectorRef.server); + connectorRef.server.stop(); + //removeServerMBean(connectorRef.connector); + CONNECTORS.remove(connectorEntry.getKey()); } } if (mbContainer != null) { diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java index 9d2dab5496f..aeb9797330d 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java @@ -137,43 +137,53 @@ public class JiraEndpoint extends ScheduledPollEndpoint implements EndpointServi disconnect(); } - public synchronized void connect() { - if (client == null) { - Registry registry = getCamelContext().getRegistry(); - JiraRestClientFactory factory - = registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY, JiraRestClientFactory.class); - if (factory == null) { - factory = new OAuthAsynchronousJiraRestClientFactory(); - } - final URI jiraServerUri = URI.create(configuration.getJiraUrl()); - if (configuration.getUsername() != null) { - LOG.debug("Connecting to JIRA with Basic authentication with username/password"); - client = factory.createWithBasicHttpAuthentication(jiraServerUri, configuration.getUsername(), - configuration.getPassword()); - } else if (configuration.getAccessToken() != null - && configuration.getVerificationCode() == null - && configuration.getPrivateKey() == null - && configuration.getConsumerKey() == null) { - client = factory.create(jiraServerUri, builder -> { - builder.setHeader("Authorization", "Bearer " + configuration.getAccessToken()); - }); - } else { - LOG.debug("Connecting to JIRA with OAuth authentication"); - JiraOAuthAuthenticationHandler oAuthHandler = new JiraOAuthAuthenticationHandler( - configuration.getConsumerKey(), - configuration.getVerificationCode(), configuration.getPrivateKey(), - configuration.getAccessToken(), - configuration.getJiraUrl()); - client = factory.create(jiraServerUri, oAuthHandler); + public void connect() { + lock.lock(); + try { + if (client == null) { + Registry registry = getCamelContext().getRegistry(); + JiraRestClientFactory factory + = registry.lookupByNameAndType(JIRA_REST_CLIENT_FACTORY, JiraRestClientFactory.class); + if (factory == null) { + factory = new OAuthAsynchronousJiraRestClientFactory(); + } + final URI jiraServerUri = URI.create(configuration.getJiraUrl()); + if (configuration.getUsername() != null) { + LOG.debug("Connecting to JIRA with Basic authentication with username/password"); + client = factory.createWithBasicHttpAuthentication(jiraServerUri, configuration.getUsername(), + configuration.getPassword()); + } else if (configuration.getAccessToken() != null + && configuration.getVerificationCode() == null + && configuration.getPrivateKey() == null + && configuration.getConsumerKey() == null) { + client = factory.create(jiraServerUri, builder -> { + builder.setHeader("Authorization", "Bearer " + configuration.getAccessToken()); + }); + } else { + LOG.debug("Connecting to JIRA with OAuth authentication"); + JiraOAuthAuthenticationHandler oAuthHandler = new JiraOAuthAuthenticationHandler( + configuration.getConsumerKey(), + configuration.getVerificationCode(), configuration.getPrivateKey(), + configuration.getAccessToken(), + configuration.getJiraUrl()); + client = factory.create(jiraServerUri, oAuthHandler); + } } + } finally { + lock.unlock(); } } - public synchronized void disconnect() throws Exception { - if (client != null) { - LOG.debug("Disconnecting from JIRA"); - client.close(); - client = null; + public void disconnect() throws Exception { + lock.lock(); + try { + if (client != null) { + LOG.debug("Disconnecting from JIRA"); + client.close(); + client = null; + } + } finally { + lock.unlock(); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java index a10d5c7814e..7fd77b11aea 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.jms; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Message; @@ -45,6 +48,7 @@ import static org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException; */ public class EndpointMessageListener implements SessionAwareMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); + private final Lock lock = new ReentrantLock(); private final JmsConsumer consumer; private final JmsEndpoint endpoint; private final AsyncProcessor processor; @@ -315,11 +319,16 @@ public class EndpointMessageListener implements SessionAwareMessageListener { this.eagerPoisonBody = eagerPoisonBody; } - public synchronized JmsOperations getTemplate() { - if (template == null) { - template = endpoint.createInOnlyTemplate(); + public JmsOperations getTemplate() { + lock.lock(); + try { + if (template == null) { + template = endpoint.createInOnlyTemplate(); + } + return template; + } finally { + lock.unlock(); } - return template; } public void setTemplate(JmsOperations template) { diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index e6ac7d7a008..31a718d09d3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -1157,14 +1157,19 @@ public class JmsComponent extends HeaderFilterStrategyComponent { super.doShutdown(); } - protected synchronized ExecutorService getAsyncStartStopExecutorService() { - if (asyncStartStopExecutorService == null) { - // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread - // for each task, and the thread pool will shrink when no more tasks running - asyncStartStopExecutorService - = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); + protected ExecutorService getAsyncStartStopExecutorService() { + lock.lock(); + try { + if (asyncStartStopExecutorService == null) { + // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread + // for each task, and the thread pool will shrink when no more tasks running + asyncStartStopExecutorService + = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); + } + return asyncStartStopExecutorService; + } finally { + lock.unlock(); } - return asyncStartStopExecutorService; } @Override diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index f0b38161cbd..b486bb7b341 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -78,7 +78,8 @@ public class JmsProducer extends DefaultAsyncProducer { protected void initReplyManager() { if (!started.get()) { - synchronized (this) { + lock.lock(); + try { if (started.get()) { return; } @@ -119,6 +120,8 @@ public class JmsProducer extends DefaultAsyncProducer { Thread.currentThread().setContextClassLoader(oldClassLoader); } started.set(true); + } finally { + lock.unlock(); } } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java index c8f80a0000d..d02c0b75f0a 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java @@ -57,11 +57,16 @@ public class JmsTemporaryQueueEndpoint extends JmsQueueEndpoint implements Desti } @Override - public synchronized Destination getJmsDestination(Session session) throws JMSException { - if (jmsDestination == null) { - jmsDestination = createJmsDestination(session); + public Destination getJmsDestination(Session session) throws JMSException { + lock.lock(); + try { + if (jmsDestination == null) { + jmsDestination = createJmsDestination(session); + } + return jmsDestination; + } finally { + lock.unlock(); } - return jmsDestination; } protected Destination createJmsDestination(Session session) throws JMSException { diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java index 2bd5cc30775..68a2b031c5d 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java @@ -50,11 +50,16 @@ public class JmsTemporaryTopicEndpoint extends JmsEndpoint implements Destinatio } @Override - public synchronized Destination getJmsDestination(Session session) throws JMSException { - if (jmsDestination == null) { - jmsDestination = createJmsDestination(session); + public Destination getJmsDestination(Session session) throws JMSException { + lock.lock(); + try { + if (jmsDestination == null) { + jmsDestination = createJmsDestination(session); + } + return jmsDestination; + } finally { + lock.unlock(); } - return jmsDestination; } protected Destination createJmsDestination(Session session) throws JMSException { diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java index 72295b24abc..400e96dd2db 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java @@ -18,6 +18,8 @@ package org.apache.camel.component.jms; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.JMSException; import jakarta.jms.MessageEOFException; @@ -25,6 +27,7 @@ import jakarta.jms.StreamMessage; public class StreamMessageInputStream extends InputStream { + private final Lock lock = new ReentrantLock(); private final StreamMessage message; private volatile boolean eof; @@ -74,11 +77,14 @@ public class StreamMessageInputStream extends InputStream { } @Override - public synchronized void reset() throws IOException { + public void reset() throws IOException { + lock.lock(); try { message.reset(); } catch (JMSException e) { throw new IOException(e); + } finally { + lock.unlock(); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java index 9c7f93d2d1e..19c3c67341e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java @@ -17,6 +17,8 @@ package org.apache.camel.component.jms.reply; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.TimeoutMap; import org.slf4j.Logger; @@ -34,7 +36,7 @@ public class MessageSelectorCreator { protected final ConcurrentSkipListSet<String> correlationIds; protected volatile boolean dirty = true; protected StringBuilder expression; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); public MessageSelectorCreator(CorrelationTimeoutMap timeoutMap) { this.timeoutMap = timeoutMap; @@ -46,7 +48,8 @@ public class MessageSelectorCreator { } public String get() { - synchronized (lock) { + lock.lock(); + try { if (!dirty) { return expression.toString(); } @@ -74,18 +77,23 @@ public class MessageSelectorCreator { dirty = false; return answer; + } finally { + lock.unlock(); } } // Changes to live correlation-ids invalidate existing message selector private void timeoutEvent(TimeoutMap.Listener.Type type, String cid) { - synchronized (lock) { + lock.lock(); + try { if (type == Put) { correlationIds.add(cid); } else if (type == Remove || type == Evict) { correlationIds.remove(cid); } dirty = true; + } finally { + lock.unlock(); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index 9d95ff14643..8eb17d17506 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -109,12 +109,15 @@ public class QueueReplyManager extends ReplyManagerSupport { Session session, String destinationName, boolean pubSubDomain) throws JMSException { - synchronized (QueueReplyManager.this) { + QueueReplyManager.this.lock.lock(); + try { // resolve the reply to destination if (destination == null) { destination = delegate.resolveDestinationName(session, destinationName, pubSubDomain); setReplyTo(destination); } + } finally { + QueueReplyManager.this.lock.unlock(); } return destination; } diff --git a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java index 52e39bb957f..2b3ecb843e7 100644 --- a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java +++ b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumerNotificationFilter.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.jmx; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import javax.management.AttributeChangeNotification; import javax.management.AttributeChangeNotificationFilter; import javax.management.Notification; @@ -26,6 +29,7 @@ import javax.management.Notification; */ public class JMXConsumerNotificationFilter extends AttributeChangeNotificationFilter { + private final Lock lock = new ReentrantLock(); private final String stringToCompare; private final boolean notifyMatch; @@ -36,25 +40,30 @@ public class JMXConsumerNotificationFilter extends AttributeChangeNotificationFi } @Override - public synchronized boolean isNotificationEnabled(Notification notification) { - boolean enabled = super.isNotificationEnabled(notification); - if (!enabled) { - return false; - } + public boolean isNotificationEnabled(Notification notification) { + lock.lock(); + try { + boolean enabled = super.isNotificationEnabled(notification); + if (!enabled) { + return false; + } - boolean match = false; - if (stringToCompare != null) { - AttributeChangeNotification acn = (AttributeChangeNotification) notification; - Object newValue = acn.getNewValue(); - // special for null - if ("null".equals(stringToCompare) && newValue == null) { - match = true; - } else if (newValue != null) { - match = stringToCompare.equals(newValue.toString()); + boolean match = false; + if (stringToCompare != null) { + AttributeChangeNotification acn = (AttributeChangeNotification) notification; + Object newValue = acn.getNewValue(); + // special for null + if ("null".equals(stringToCompare) && newValue == null) { + match = true; + } else if (newValue != null) { + match = stringToCompare.equals(newValue.toString()); + } + return notifyMatch == match; + } else { + return true; } - return notifyMatch == match; - } else { - return true; + } finally { + lock.unlock(); } } diff --git a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java index a9981fa385c..532b5959a7a 100644 --- a/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java +++ b/components/camel-jolt/src/main/java/org/apache/camel/component/jolt/JoltEndpoint.java @@ -82,39 +82,44 @@ public class JoltEndpoint extends ResourceEndpoint { return "jolt:" + getResourceUri(); } - private synchronized JoltTransform getTransform() throws Exception { - if (transform == null) { - if (log.isDebugEnabled()) { - String path = getResourceUri(); - log.debug("Jolt content read from resource {} with resourceUri: {} for endpoint {}", getResourceUri(), path, - getEndpointUri()); - } + private JoltTransform getTransform() throws Exception { + getInternalLock().lock(); + try { + if (transform == null) { + if (log.isDebugEnabled()) { + String path = getResourceUri(); + log.debug("Jolt content read from resource {} with resourceUri: {} for endpoint {}", getResourceUri(), path, + getEndpointUri()); + } - // Sortr does not require a spec - if (this.transformDsl == JoltTransformType.Sortr) { - this.transform = new Sortr(); - } else { - // getResourceAsInputStream also considers the content cache - Object spec = JsonUtils.jsonToObject(getResourceAsInputStream()); - switch (this.transformDsl) { - case Shiftr: - this.transform = new Shiftr(spec); - break; - case Defaultr: - this.transform = new Defaultr(spec); - break; - case Removr: - this.transform = new Removr(spec); - break; - case Chainr: - default: - this.transform = Chainr.fromSpec(spec); - break; + // Sortr does not require a spec + if (this.transformDsl == JoltTransformType.Sortr) { + this.transform = new Sortr(); + } else { + // getResourceAsInputStream also considers the content cache + Object spec = JsonUtils.jsonToObject(getResourceAsInputStream()); + switch (this.transformDsl) { + case Shiftr: + this.transform = new Shiftr(spec); + break; + case Defaultr: + this.transform = new Defaultr(spec); + break; + case Removr: + this.transform = new Removr(spec); + break; + case Chainr: + default: + this.transform = Chainr.fromSpec(spec); + break; + } } - } + } + return transform; + } finally { + getInternalLock().unlock(); } - return transform; } /** diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java index cc55c69c08d..284808c7129 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java @@ -121,13 +121,18 @@ public class JpaComponent extends HealthCheckComponent { return aliases; } - synchronized ExecutorService getOrCreatePollingConsumerExecutorService() { - if (pollingConsumerExecutorService == null) { - LOG.debug("Creating thread pool for JpaPollingConsumer to support polling using timeout"); - pollingConsumerExecutorService - = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "JpaPollingConsumer"); + ExecutorService getOrCreatePollingConsumerExecutorService() { + lock.lock(); + try { + if (pollingConsumerExecutorService == null) { + LOG.debug("Creating thread pool for JpaPollingConsumer to support polling using timeout"); + pollingConsumerExecutorService + = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "JpaPollingConsumer"); + } + return pollingConsumerExecutorService; + } finally { + lock.unlock(); } - return pollingConsumerExecutorService; } // Implementation methods diff --git a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java index d77208f267c..4e8025bf5c2 100644 --- a/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java +++ b/components/camel-jslt/src/main/java/org/apache/camel/component/jslt/JsltEndpoint.java @@ -107,62 +107,68 @@ public class JsltEndpoint extends ResourceEndpoint { return "jslt:" + getResourceUri(); } - private synchronized Expression getTransform(Message msg) throws Exception { - final String jsltStringFromHeader - = allowTemplateFromHeader ? msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null; + private Expression getTransform(Message msg) throws Exception { + getInternalLock().lock(); + try { - final boolean useTemplateFromUri = jsltStringFromHeader == null; + final String jsltStringFromHeader + = allowTemplateFromHeader ? msg.getHeader(JsltConstants.HEADER_JSLT_STRING, String.class) : null; - if (useTemplateFromUri && transform != null) { - return transform; - } + final boolean useTemplateFromUri = jsltStringFromHeader == null; - final Collection<Function> functions = Objects.requireNonNullElse( - ((JsltComponent) getComponent()).getFunctions(), - Collections.emptyList()); + if (useTemplateFromUri && transform != null) { + return transform; + } - final JsonFilter objectFilter = Objects.requireNonNullElse( - ((JsltComponent) getComponent()).getObjectFilter(), - DEFAULT_JSON_FILTER); + final Collection<Function> functions = Objects.requireNonNullElse( + ((JsltComponent) getComponent()).getFunctions(), + Collections.emptyList()); - final String transformSource; - final InputStream stream; + final JsonFilter objectFilter = Objects.requireNonNullElse( + ((JsltComponent) getComponent()).getObjectFilter(), + DEFAULT_JSON_FILTER); - if (useTemplateFromUri) { - transformSource = getResourceUri(); + final String transformSource; + final InputStream stream; + + if (useTemplateFromUri) { + transformSource = getResourceUri(); + + if (log.isDebugEnabled()) { + log.debug("Jslt content read from resource {} with resourceUri: {} for endpoint {}", + transformSource, + transformSource, + getEndpointUri()); + } - if (log.isDebugEnabled()) { - log.debug("Jslt content read from resource {} with resourceUri: {} for endpoint {}", - transformSource, - transformSource, - getEndpointUri()); + stream = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), transformSource); + if (stream == null) { + throw new JsltException("Cannot load resource '" + transformSource + "': not found"); + } + } else { // use template from header + stream = new ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8)); + transformSource = "<inline>"; } - stream = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), transformSource); - if (stream == null) { - throw new JsltException("Cannot load resource '" + transformSource + "': not found"); + final Expression transform; + try { + transform = new Parser(new InputStreamReader(stream)) + .withFunctions(functions) + .withObjectFilter(objectFilter) + .withSource(transformSource) + .compile(); + } finally { + // the stream is consumed only on .compile(), cannot be closed before + IOHelper.close(stream); } - } else { // use template from header - stream = new ByteArrayInputStream(jsltStringFromHeader.getBytes(StandardCharsets.UTF_8)); - transformSource = "<inline>"; - } - final Expression transform; - try { - transform = new Parser(new InputStreamReader(stream)) - .withFunctions(functions) - .withObjectFilter(objectFilter) - .withSource(transformSource) - .compile(); + if (useTemplateFromUri) { + this.transform = transform; + } + return transform; } finally { - // the stream is consumed only on .compile(), cannot be closed before - IOHelper.close(stream); - } - - if (useTemplateFromUri) { - this.transform = transform; + getInternalLock().unlock(); } - return transform; } public JsltEndpoint findOrCreateEndpoint(String uri, String newResourceUri) { diff --git a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java index 893d118a465..9c921cedefb 100644 --- a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java +++ b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java @@ -188,10 +188,13 @@ public class JsonValidatorEndpoint extends ResourceEndpoint { * @return The currently loaded schema */ private JsonSchema getOrCreateSchema() throws Exception { - synchronized (this) { + getInternalLock().lock(); + try { if (this.schema == null) { this.schema = this.uriSchemaLoader.createSchema(getCamelContext(), getResourceUri()); } + } finally { + getInternalLock().unlock(); } return this.schema; } diff --git a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java index 24999b525a9..7b05810a99f 100644 --- a/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java +++ b/components/camel-jsonpath/src/main/java/org/apache/camel/jsonpath/JsonStream.java @@ -275,7 +275,7 @@ public class JsonStream extends FilterInputStream { } @Override - public synchronized void reset() throws IOException { + public void reset() throws IOException { throw new IOException("reset not supported"); } diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java index 10e448b6a54..13ca2d83cbf 100644 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java @@ -86,12 +86,17 @@ public class Jt400Component extends HealthCheckComponent { * * @return the default connection pool used by this component */ - public synchronized AS400ConnectionPool getConnectionPool() { - if (connectionPool == null) { - LOG.info("Instantiating the default connection pool ..."); - connectionPool = new AS400ConnectionPool(); + public AS400ConnectionPool getConnectionPool() { + lock.lock(); + try { + if (connectionPool == null) { + LOG.info("Instantiating the default connection pool ..."); + connectionPool = new AS400ConnectionPool(); + } + return connectionPool; + } finally { + lock.unlock(); } - return connectionPool; } public void setConnectionPool(AS400ConnectionPool connectionPool) { diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java index d6c2114a6df..2f3b2d4db47 100755 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java @@ -101,43 +101,48 @@ public class Jt400MsgQueueConsumer extends ScheduledPollConsumer { } } - private synchronized Exchange receive(MessageQueue queue, long timeout) throws Exception { - QueuedMessage entry; - int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1; - LOG.trace("Reading from message queue: {} with {} seconds timeout", queue.getPath(), - -1 == seconds ? "infinite" : seconds); - - Jt400Configuration.MessageAction messageAction = getEndpoint().getMessageAction(); - entry = queue.receive(messageKey, //message key - seconds, //timeout - messageAction.getJt400Value(), // message action - null == messageKey ? MessageQueue.ANY : MessageQueue.NEXT); // types of messages - - if (null == entry) { - return null; - } - // Need to tuck away the message key if the message action is SAME, otherwise - // we'll just keep retrieving the same message over and over - if (Jt400Configuration.MessageAction.SAME == messageAction) { - this.messageKey = entry.getKey(); - } + private Exchange receive(MessageQueue queue, long timeout) throws Exception { + lock.lock(); + try { + QueuedMessage entry; + int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1; + LOG.trace("Reading from message queue: {} with {} seconds timeout", queue.getPath(), + -1 == seconds ? "infinite" : seconds); + + Jt400Configuration.MessageAction messageAction = getEndpoint().getMessageAction(); + entry = queue.receive(messageKey, //message key + seconds, //timeout + messageAction.getJt400Value(), // message action + null == messageKey ? MessageQueue.ANY : MessageQueue.NEXT); // types of messages + + if (null == entry) { + return null; + } + // Need to tuck away the message key if the message action is SAME, otherwise + // we'll just keep retrieving the same message over and over + if (Jt400Configuration.MessageAction.SAME == messageAction) { + this.messageKey = entry.getKey(); + } - Exchange exchange = createExchange(true); - exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION, - entry.getFromJobNumber() + "/" + entry.getUser() + "/" + entry.getFromJobName()); - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, entry.getID()); - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE, entry.getFileName()); - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE, entry.getType()); - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity()); - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE, entry); - if (AS400Message.INQUIRY == entry.getType()) { - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply()); - if (getEndpoint().isSendingReply()) { - setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey()); + Exchange exchange = createExchange(true); + exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION, + entry.getFromJobNumber() + "/" + entry.getUser() + "/" + entry.getFromJobName()); + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, entry.getID()); + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE, entry.getFileName()); + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE, entry.getType()); + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_SEVERITY, entry.getSeverity()); + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE, entry); + if (AS400Message.INQUIRY == entry.getType()) { + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply()); + if (getEndpoint().isSendingReply()) { + setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey()); + } } + exchange.getIn().setBody(entry.getText()); + return exchange; + } finally { + lock.unlock(); } - exchange.getIn().setBody(entry.getText()); - return exchange; } private static void setHeaderIfValueNotNull(final Message message, final String header, final Object value) { diff --git a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java index db1b75288e1..321d4fbf873 100644 --- a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java +++ b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandlerReifier.java @@ -161,29 +161,34 @@ public class JtaTransactionErrorHandlerReifier extends ErrorHandlerReifier<JtaTr return answer; } - protected synchronized ScheduledExecutorService getExecutorService( + protected ScheduledExecutorService getExecutorService( ScheduledExecutorService executorService, String executorServiceRef) { - if (executorService == null || executorService.isShutdown()) { - // camel context will shutdown the executor when it shutdown so no - // need to shut it down when stopping - if (executorServiceRef != null) { - executorService = lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); - if (executorService == null) { - ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); - ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); - executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); + lock.lock(); + try { + if (executorService == null || executorService.isShutdown()) { + // camel context will shutdown the executor when it shutdown so no + // need to shut it down when stopping + if (executorServiceRef != null) { + executorService = lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); + if (executorService == null) { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); + executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); + } + if (executorService == null) { + throw new IllegalArgumentException("ExecutorService " + executorServiceRef + " not found in registry."); + } + } else { + // no explicit configured thread pool, so leave it up to the + // error handler to decide if it need a default thread pool from + // CamelContext#getErrorHandlerExecutorService + executorService = null; } - if (executorService == null) { - throw new IllegalArgumentException("ExecutorService " + executorServiceRef + " not found in registry."); - } - } else { - // no explicit configured thread pool, so leave it up to the - // error handler to decide if it need a default thread pool from - // CamelContext#getErrorHandlerExecutorService - executorService = null; } + return executorService; + } finally { + lock.unlock(); } - return executorService; } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 3534fccf3f4..f30af79ef82 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -651,7 +651,7 @@ public class KafkaFetchRecords implements Runnable { state = State.RESUME_REQUESTED; } - private synchronized void setLastError(Exception lastError) { + private void setLastError(Exception lastError) { this.lastError = lastError; } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 5e3eb6cc9da..e7d493e0d1a 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -23,7 +23,11 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -63,6 +67,8 @@ public class KameletComponent extends DefaultComponent { // active consumers private final Map<String, KameletConsumer> consumers = new HashMap<>(); + private final Lock consumersLock = new ReentrantLock(); + private final Condition consumersCondition = consumersLock.newCondition(); // active kamelet EIPs private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>(); @@ -344,7 +350,8 @@ public class KameletComponent extends DefaultComponent { } public void addConsumer(String key, KameletConsumer consumer) { - synchronized (consumers) { + consumersLock.lock(); + try { if (consumers.putIfAbsent(key, consumer) != null) { throw new IllegalArgumentException( "Cannot add a 2nd consumer to the same endpoint: " + key @@ -352,21 +359,27 @@ public class KameletComponent extends DefaultComponent { } // state changed so inc counter stateCounter++; - consumers.notifyAll(); + consumersCondition.signalAll(); + } finally { + consumersLock.unlock(); } } public void removeConsumer(String key, KameletConsumer consumer) { - synchronized (consumers) { + consumersLock.lock(); + try { consumers.remove(key, consumer); // state changed so inc counter stateCounter++; - consumers.notifyAll(); + consumersCondition.signalAll(); + } finally { + consumersLock.unlock(); } } protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException { - synchronized (consumers) { + consumersLock.lock(); + try { KameletConsumer answer = consumers.get(key); if (answer == null && block) { StopWatch watch = new StopWatch(); @@ -379,10 +392,12 @@ public class KameletComponent extends DefaultComponent { if (rem <= 0) { break; } - consumers.wait(rem); + consumersCondition.await(rem, TimeUnit.MILLISECONDS); } } return answer; + } finally { + consumersLock.unlock(); } } diff --git a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index 2b68cd98efb..7a96ef3e3de 100644 --- a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -132,11 +132,16 @@ public class KnativeComponent extends HealthCheckComponent { return producerFactory; } - public synchronized KnativeProducerFactory getOrCreateProducerFactory() throws Exception { - if (producerFactory == null) { - producerFactory = setUpProducerFactory(); + public KnativeProducerFactory getOrCreateProducerFactory() throws Exception { + lock.lock(); + try { + if (producerFactory == null) { + producerFactory = setUpProducerFactory(); + } + return producerFactory; + } finally { + lock.unlock(); } - return producerFactory; } /** @@ -150,11 +155,16 @@ public class KnativeComponent extends HealthCheckComponent { return consumerFactory; } - public synchronized KnativeConsumerFactory getOrCreateConsumerFactory() throws Exception { - if (consumerFactory == null) { - consumerFactory = setUpConsumerFactory(); + public KnativeConsumerFactory getOrCreateConsumerFactory() throws Exception { + lock.lock(); + try { + if (consumerFactory == null) { + consumerFactory = setUpConsumerFactory(); + } + return consumerFactory; + } finally { + lock.unlock(); } - return consumerFactory; } /** diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index 61b2a763001..ded0576fe2b 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -126,69 +126,79 @@ public class MasterConsumer extends DefaultConsumer implements ResumeAware<Resum // Helpers // ************************************** - private synchronized void onLeadershipTaken() throws Exception { - if (!isRunAllowed()) { - return; - } + private void onLeadershipTaken() throws Exception { + lock.lock(); + try { + if (!isRunAllowed()) { + return; + } - if (delegatedConsumer != null) { - return; - } + if (delegatedConsumer != null) { + return; + } - // start consumer using background task up till X attempts - long delay = masterEndpoint.getComponent().getBackOffDelay(); - long max = masterEndpoint.getComponent().getBackOffMaxAttempts(); + // start consumer using background task up till X attempts + long delay = masterEndpoint.getComponent().getBackOffDelay(); + long max = masterEndpoint.getComponent().getBackOffMaxAttempts(); - BackOffTimer timer = new BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool()); - timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> { - LOG.info("Leadership taken. Attempt #{} to start consumer: {}", task.getCurrentAttempts(), - delegatedEndpoint); + BackOffTimer timer = new BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool()); + timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> { + LOG.info("Leadership taken. Attempt #{} to start consumer: {}", task.getCurrentAttempts(), + delegatedEndpoint); - Exception cause = null; - try { - if (delegatedConsumer == null) { - delegatedConsumer = delegatedEndpoint.createConsumer(processor); - if (delegatedConsumer instanceof StartupListener) { - getEndpoint().getCamelContext().addStartupListener((StartupListener) delegatedConsumer); - } - if (delegatedConsumer instanceof ResumeAware resumeAwareConsumer && resumeStrategy != null) { - LOG.debug("Setting up the resume adapter for the resume strategy in consumer"); - ResumeAdapter resumeAdapter - = AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer, - resumeStrategy); - resumeStrategy.setAdapter(resumeAdapter); - - LOG.debug("Setting up the resume strategy for consumer"); - resumeAwareConsumer.setResumeStrategy(resumeStrategy); + Exception cause = null; + try { + if (delegatedConsumer == null) { + delegatedConsumer = delegatedEndpoint.createConsumer(processor); + if (delegatedConsumer instanceof StartupListener) { + getEndpoint().getCamelContext().addStartupListener((StartupListener) delegatedConsumer); + } + if (delegatedConsumer instanceof ResumeAware resumeAwareConsumer && resumeStrategy != null) { + LOG.debug("Setting up the resume adapter for the resume strategy in consumer"); + ResumeAdapter resumeAdapter + = AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer, + resumeStrategy); + resumeStrategy.setAdapter(resumeAdapter); + + LOG.debug("Setting up the resume strategy for consumer"); + resumeAwareConsumer.setResumeStrategy(resumeStrategy); + } } - } - ServiceHelper.startService(delegatedEndpoint, delegatedConsumer); + ServiceHelper.startService(delegatedEndpoint, delegatedConsumer); - } catch (Exception e) { - cause = e; - } + } catch (Exception e) { + cause = e; + } - if (cause != null) { - String message = "Leadership taken. Attempt #" + task.getCurrentAttempts() - + " failed to start consumer due to: " + cause.getMessage(); - getExceptionHandler().handleException(message, cause); - return true; // retry - } + if (cause != null) { + String message = "Leadership taken. Attempt #" + task.getCurrentAttempts() + + " failed to start consumer due to: " + cause.getMessage(); + getExceptionHandler().handleException(message, cause); + return true; // retry + } - LOG.info("Leadership taken. Attempt #" + task.getCurrentAttempts() + " success. Consumer started: {}", - delegatedEndpoint); - return false; // no more attempts - }); + LOG.info("Leadership taken. Attempt #" + task.getCurrentAttempts() + " success. Consumer started: {}", + delegatedEndpoint); + return false; // no more attempts + }); + } finally { + lock.unlock(); + } } - private synchronized void onLeadershipLost() { - LOG.debug("Leadership lost. Stopping consumer: {}", delegatedEndpoint); + private void onLeadershipLost() { + lock.lock(); try { - ServiceHelper.stopAndShutdownServices(delegatedConsumer, delegatedEndpoint); + LOG.debug("Leadership lost. Stopping consumer: {}", delegatedEndpoint); + try { + ServiceHelper.stopAndShutdownServices(delegatedConsumer, delegatedEndpoint); + } finally { + delegatedConsumer = null; + } + LOG.info("Leadership lost. Consumer stopped: {}", delegatedEndpoint); } finally { - delegatedConsumer = null; + lock.unlock(); } - LOG.info("Leadership lost. Consumer stopped: {}", delegatedEndpoint); } // ************************************** diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index 802d7b8914b..cf377835847 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -131,216 +131,226 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } @Override - public synchronized void process(Exchange exchange) throws MllpException { - log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket); - getEndpoint().updateLastConnectionActivityTicks(); - - Message message = exchange.getMessage(); - - getEndpoint().checkBeforeSendProperties(exchange, socket, log); - - // Establish a connection if needed + public void process(Exchange exchange) throws MllpException { + lock.lock(); try { - checkConnection(); + log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket); + getEndpoint().updateLastConnectionActivityTicks(); - if (cachedLocalAddress != null) { - message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, cachedLocalAddress); - } + Message message = exchange.getMessage(); - if (cachedRemoteAddress != null) { - message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, cachedRemoteAddress); - } + getEndpoint().checkBeforeSendProperties(exchange, socket, log); - // Send the message to the external system - byte[] hl7MessageBytes = null; - Object messageBody = message.getBody(); - if (messageBody == null) { - String exceptionMessage - = String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket); - exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi)); - return; - } else if (messageBody instanceof byte[]) { - hl7MessageBytes = (byte[]) messageBody; - } else if (messageBody instanceof String) { - String stringBody = (String) messageBody; - hl7MessageBytes = stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset)); - if (getConfiguration().hasCharsetName()) { - exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, getConfiguration().getCharsetName()); + // Establish a connection if needed + try { + checkConnection(); + + if (cachedLocalAddress != null) { + message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, cachedLocalAddress); } - } - log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket); + if (cachedRemoteAddress != null) { + message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, cachedRemoteAddress); + } - try { - mllpBuffer.setEnvelopedMessage(hl7MessageBytes); - mllpBuffer.writeTo(socket); - } catch (MllpSocketException writeEx) { - // Connection may have been reset - try one more time - log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect", - exchange.getExchangeId(), socket, writeEx); - try { - checkConnection(); - log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(), socket); - try { - mllpBuffer.writeTo(socket); - } catch (MllpSocketException retryWriteEx) { - String exceptionMessage = String.format( - "process(%s) [%s] - exception encountered attempting to write payload after reconnect", - exchange.getExchangeId(), socket); - log.warn(exceptionMessage, retryWriteEx); - exchange.setException( - new MllpWriteException( - exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi)); + // Send the message to the external system + byte[] hl7MessageBytes = null; + Object messageBody = message.getBody(); + if (messageBody == null) { + String exceptionMessage + = String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket); + exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi)); + return; + } else if (messageBody instanceof byte[]) { + hl7MessageBytes = (byte[]) messageBody; + } else if (messageBody instanceof String) { + String stringBody = (String) messageBody; + hl7MessageBytes = stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset)); + if (getConfiguration().hasCharsetName()) { + exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, getConfiguration().getCharsetName()); } - } catch (IOException reconnectEx) { - String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to reconnect", - exchange.getExchangeId(), socket); - log.warn(exceptionMessage, reconnectEx); - exchange.setException( - new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx, logPhi)); - mllpBuffer.resetSocket(socket); } - } - if (getConfiguration().getExchangePattern() == ExchangePattern.InOnly) { - log.debug("process({}) [{}] - not checking acknowledgement from external system", - exchange.getExchangeId(), socket); - return; - } - if (exchange.getException() == null) { - log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(), socket); + + log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket); + try { - mllpBuffer.reset(); - mllpBuffer.readFrom(socket); - } catch (MllpSocketException receiveAckEx) { + mllpBuffer.setEnvelopedMessage(hl7MessageBytes); + mllpBuffer.writeTo(socket); + } catch (MllpSocketException writeEx) { // Connection may have been reset - try one more time - log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect", - exchange.getExchangeId(), socket, receiveAckEx); + log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect", + exchange.getExchangeId(), socket, writeEx); try { checkConnection(); + log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(), + socket); + try { + mllpBuffer.writeTo(socket); + } catch (MllpSocketException retryWriteEx) { + String exceptionMessage = String.format( + "process(%s) [%s] - exception encountered attempting to write payload after reconnect", + exchange.getExchangeId(), socket); + log.warn(exceptionMessage, retryWriteEx); + exchange.setException( + new MllpWriteException( + exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi)); + } } catch (IOException reconnectEx) { - String exceptionMessage = String.format( - "process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure", - exchange.getExchangeId(), socket); + String exceptionMessage + = String.format("process(%s) [%s] - exception encountered attempting to reconnect", + exchange.getExchangeId(), socket); log.warn(exceptionMessage, reconnectEx); exchange.setException( - new MllpAcknowledgementReceiveException( - exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi)); + new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx, logPhi)); mllpBuffer.resetSocket(socket); } - - if (exchange.getException() == null) { - log.trace("process({}) [{}] - resending payload after successful reconnect", exchange.getExchangeId(), - socket); + } + if (getConfiguration().getExchangePattern() == ExchangePattern.InOnly) { + log.debug("process({}) [{}] - not checking acknowledgement from external system", + exchange.getExchangeId(), socket); + return; + } + if (exchange.getException() == null) { + log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(), + socket); + try { + mllpBuffer.reset(); + mllpBuffer.readFrom(socket); + } catch (MllpSocketException receiveAckEx) { + // Connection may have been reset - try one more time + log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect", + exchange.getExchangeId(), socket, receiveAckEx); try { - mllpBuffer.setEnvelopedMessage(hl7MessageBytes); - mllpBuffer.writeTo(socket); - } catch (MllpSocketException writeRetryEx) { + checkConnection(); + } catch (IOException reconnectEx) { String exceptionMessage = String.format( - "process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect", + "process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure", exchange.getExchangeId(), socket); - log.warn(exceptionMessage, writeRetryEx); + log.warn(exceptionMessage, reconnectEx); exchange.setException( - new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi)); + new MllpAcknowledgementReceiveException( + exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi)); + mllpBuffer.resetSocket(socket); } if (exchange.getException() == null) { - log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from external system", - exchange.getExchangeId(), socket); + log.trace("process({}) [{}] - resending payload after successful reconnect", + exchange.getExchangeId(), + socket); try { - mllpBuffer.reset(); - mllpBuffer.readFrom(socket); - } catch (MllpSocketException secondReceiveEx) { - String exceptionMessageFormat = mllpBuffer.isEmpty() - ? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend" - : "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend"; - String exceptionMessage - = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); - log.warn(exceptionMessage, secondReceiveEx); - // Send the original exception to the exchange - exchange.setException(new MllpAcknowledgementReceiveException( - exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx, - logPhi)); - } catch (SocketTimeoutException secondReadTimeoutEx) { - String exceptionMessageFormat = mllpBuffer.isEmpty() - ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend" - : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend"; - String exceptionMessage - = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); - log.warn(exceptionMessage, secondReadTimeoutEx); - // Send the original exception to the exchange - exchange.setException(new MllpAcknowledgementTimeoutException( - exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx, - logPhi)); - mllpBuffer.resetSocket(socket); + mllpBuffer.setEnvelopedMessage(hl7MessageBytes); + mllpBuffer.writeTo(socket); + } catch (MllpSocketException writeRetryEx) { + String exceptionMessage = String.format( + "process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect", + exchange.getExchangeId(), socket); + log.warn(exceptionMessage, writeRetryEx); + exchange.setException( + new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi)); + } + + if (exchange.getException() == null) { + log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from external system", + exchange.getExchangeId(), socket); + try { + mllpBuffer.reset(); + mllpBuffer.readFrom(socket); + } catch (MllpSocketException secondReceiveEx) { + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend" + : "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend"; + String exceptionMessage + = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, secondReceiveEx); + // Send the original exception to the exchange + exchange.setException(new MllpAcknowledgementReceiveException( + exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx, + logPhi)); + } catch (SocketTimeoutException secondReadTimeoutEx) { + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend" + : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend"; + String exceptionMessage + = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, secondReadTimeoutEx); + // Send the original exception to the exchange + exchange.setException(new MllpAcknowledgementTimeoutException( + exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx, + logPhi)); + mllpBuffer.resetSocket(socket); + } } } + } catch (SocketTimeoutException timeoutEx) { + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment" + : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment"; + String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, timeoutEx); + exchange.setException(new MllpAcknowledgementTimeoutException( + exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi)); + mllpBuffer.resetSocket(socket); } - } catch (SocketTimeoutException timeoutEx) { - String exceptionMessageFormat = mllpBuffer.isEmpty() - ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment" - : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment"; - String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); - log.warn(exceptionMessage, timeoutEx); - exchange.setException(new MllpAcknowledgementTimeoutException( - exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi)); - mllpBuffer.resetSocket(socket); - } - if (exchange.getException() == null) { - if (mllpBuffer.hasCompleteEnvelope()) { - byte[] acknowledgementBytes = mllpBuffer.toMllpPayload(); - - log.debug( - "process({}) [{}] - populating message headers with the acknowledgement from the external system", - exchange.getExchangeId(), socket); - message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); - if (acknowledgementBytes != null && acknowledgementBytes.length > 0) { - message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String( - acknowledgementBytes, - MllpCharsetHelper.getCharset(exchange, acknowledgementBytes, hl7Util, charset))); - } else { - message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, ""); - } + if (exchange.getException() == null) { + if (mllpBuffer.hasCompleteEnvelope()) { + byte[] acknowledgementBytes = mllpBuffer.toMllpPayload(); - if (getConfiguration().isValidatePayload()) { - String exceptionMessage = hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes); - if (exceptionMessage != null) { - exchange.setException(new MllpInvalidAcknowledgementException( - exceptionMessage, hl7MessageBytes, acknowledgementBytes, logPhi)); + log.debug( + "process({}) [{}] - populating message headers with the acknowledgement from the external system", + exchange.getExchangeId(), socket); + message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + if (acknowledgementBytes != null && acknowledgementBytes.length > 0) { + message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String( + acknowledgementBytes, + MllpCharsetHelper.getCharset(exchange, acknowledgementBytes, hl7Util, charset))); + } else { + message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, ""); } - } - if (exchange.getException() == null) { - log.debug("process({}) [{}] - processing the acknowledgement from the external system", - exchange.getExchangeId(), socket); - try { - message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, - processAcknowledgment(hl7MessageBytes, acknowledgementBytes)); - } catch (MllpNegativeAcknowledgementException nackEx) { - message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, nackEx.getAcknowledgmentType()); - exchange.setException(nackEx); + if (getConfiguration().isValidatePayload()) { + String exceptionMessage = hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes); + if (exceptionMessage != null) { + exchange.setException(new MllpInvalidAcknowledgementException( + exceptionMessage, hl7MessageBytes, acknowledgementBytes, logPhi)); + } } - getEndpoint().checkAfterSendProperties(exchange, socket, log); + if (exchange.getException() == null) { + log.debug("process({}) [{}] - processing the acknowledgement from the external system", + exchange.getExchangeId(), socket); + try { + message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, + processAcknowledgment(hl7MessageBytes, acknowledgementBytes)); + } catch (MllpNegativeAcknowledgementException nackEx) { + message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, nackEx.getAcknowledgmentType()); + exchange.setException(nackEx); + } + + getEndpoint().checkAfterSendProperties(exchange, socket, log); + } + } else { + String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received", + exchange.getExchangeId(), socket); + exchange.setException(new MllpInvalidAcknowledgementException( + exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), logPhi)); } - } else { - String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received", - exchange.getExchangeId(), socket); - exchange.setException(new MllpInvalidAcknowledgementException( - exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), logPhi)); } } + + } catch (IOException ioEx) { + log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket, + ioEx); + exchange.setException(ioEx); + mllpBuffer.resetSocket(socket); + } finally { + mllpBuffer.reset(); } - } catch (IOException ioEx) { - log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket, ioEx); - exchange.setException(ioEx); - mllpBuffer.resetSocket(socket); + log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket); } finally { - mllpBuffer.reset(); + lock.unlock(); } - - log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket); } private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException { @@ -519,44 +529,49 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { * Check for idle connection */ @Override - public synchronized void run() { - if (getConfiguration().hasIdleTimeout()) { - if (null != socket && !socket.isClosed() && socket.isConnected()) { - if (getEndpoint().hasLastConnectionActivityTicks()) { - long idleTime = System.currentTimeMillis() - getEndpoint().getLastConnectionActivityTicks(); - if (log.isDebugEnabled()) { - log.debug("Checking {} for idle connection: {} - {}", getConnectionAddress(), idleTime, - getConfiguration().getIdleTimeout()); - } - if (idleTime >= getConfiguration().getIdleTimeout()) { - if (MllpIdleTimeoutStrategy.CLOSE == getConfiguration().getIdleTimeoutStrategy()) { - log.info( - "MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - closing connection", - idleTime, getConfiguration().getIdleTimeout()); - mllpBuffer.closeSocket(socket); + public void run() { + lock.lock(); + try { + if (getConfiguration().hasIdleTimeout()) { + if (null != socket && !socket.isClosed() && socket.isConnected()) { + if (getEndpoint().hasLastConnectionActivityTicks()) { + long idleTime = System.currentTimeMillis() - getEndpoint().getLastConnectionActivityTicks(); + if (log.isDebugEnabled()) { + log.debug("Checking {} for idle connection: {} - {}", getConnectionAddress(), idleTime, + getConfiguration().getIdleTimeout()); + } + if (idleTime >= getConfiguration().getIdleTimeout()) { + if (MllpIdleTimeoutStrategy.CLOSE == getConfiguration().getIdleTimeoutStrategy()) { + log.info( + "MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - closing connection", + idleTime, getConfiguration().getIdleTimeout()); + mllpBuffer.closeSocket(socket); + } else { + log.info( + "MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting connection", + idleTime, getConfiguration().getIdleTimeout()); + mllpBuffer.resetSocket(socket); + } } else { - log.info( - "MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting connection", - idleTime, getConfiguration().getIdleTimeout()); - mllpBuffer.resetSocket(socket); + long minDelay = 100; + long delay = Long.min(Long.max(minDelay, getConfiguration().getIdleTimeout() - idleTime), + getConfiguration().getIdleTimeout()); + if (log.isDebugEnabled()) { + log.debug("Scheduling idle producer connection check of {} in {} milliseconds", + getConnectionAddress(), delay); + } + idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS); } } else { - long minDelay = 100; - long delay = Long.min(Long.max(minDelay, getConfiguration().getIdleTimeout() - idleTime), + log.debug( + "No activity detected since initial connection - scheduling idle producer connection check in {} milliseconds", getConfiguration().getIdleTimeout()); - if (log.isDebugEnabled()) { - log.debug("Scheduling idle producer connection check of {} in {} milliseconds", - getConnectionAddress(), delay); - } - idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS); + idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS); } - } else { - log.debug( - "No activity detected since initial connection - scheduling idle producer connection check in {} milliseconds", - getConfiguration().getIdleTimeout()); - idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS); } } + } finally { + lock.unlock(); } } diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java index d4675fc586b..653d32b03ea 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.mongodb; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.FindOneAndUpdateOptions; @@ -36,6 +39,7 @@ public class MongoDbTailTrackingManager { private final MongoClient connection; private final MongoDbTailTrackingConfig config; + private final Lock lock = new ReentrantLock(); private MongoCollection<Document> dbCol; private Document trackingObj; @@ -60,32 +64,42 @@ public class MongoDbTailTrackingManager { trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID)); } - public synchronized void persistToStore() { - if (!config.persistent || lastVal == null) { - return; + public void persistToStore() { + lock.lock(); + try { + if (!config.persistent || lastVal == null) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection); + } + + Bson updateObj = Updates.set(config.field, lastVal); + FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER); + trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, options); + } finally { + lock.unlock(); } - - if (LOG.isDebugEnabled()) { - LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection); - } - - Bson updateObj = Updates.set(config.field, lastVal); - FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER); - trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, options); } - public synchronized Object recoverFromStore() { - if (!config.persistent) { - return null; - } + public Object recoverFromStore() { + lock.lock(); + try { + if (!config.persistent) { + return null; + } - lastVal = dbCol.find(trackingObj).first().get(config.field); + lastVal = dbCol.find(trackingObj).first().get(config.field); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection); + } - return lastVal; + return lastVal; + } finally { + lock.unlock(); + } } public void setLastVal(Document dbObj) { diff --git a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java index e8112e436a4..e0c1e976c09 100644 --- a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java +++ b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java @@ -45,17 +45,22 @@ public class MyBatisComponent extends HealthCheckComponent { return answer; } - protected synchronized SqlSessionFactory createSqlSessionFactory() throws IOException { - if (sqlSessionFactory == null) { - ObjectHelper.notNull(configurationUri, "configurationUri", this); - InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), configurationUri); - try { - sqlSessionFactory = new SqlSessionFactoryBuilder().build(is); - } finally { - IOHelper.close(is); + protected SqlSessionFactory createSqlSessionFactory() throws IOException { + lock.lock(); + try { + if (sqlSessionFactory == null) { + ObjectHelper.notNull(configurationUri, "configurationUri", this); + InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), configurationUri); + try { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(is); + } finally { + IOHelper.close(is); + } } + return sqlSessionFactory; + } finally { + lock.unlock(); } - return sqlSessionFactory; } public SqlSessionFactory getSqlSessionFactory() {