This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-20199/remove-synchronized-blocks-from-t2z-components in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4f5be3d402fa6e4b08483c781c32b2c5e45627f3 Author: Nicolas Filotto <nicolas.filo...@qlik.com> AuthorDate: Thu Jan 16 16:05:55 2025 +0100 CAMEL-20199: Remove synchronized blocks from components T to Z --- .../client/GrpcStreamingExchangeForwarder.java | 14 +- .../component/mllp/internal/MllpSocketBuffer.java | 681 +++++++++++++-------- .../platform/http/vertx/AsyncInputStream.java | 134 ++-- .../sjms/reply/TemporaryQueueReplyManager.java | 31 +- .../apache/camel/component/smpp/SmppSplitter.java | 43 +- .../component/thymeleaf/ThymeleafEndpoint.java | 63 +- .../camel/component/timer/TimerComponent.java | 62 +- .../twilio/internal/TwilioPropertiesHelper.java | 17 +- .../component/undertow/DefaultUndertowHost.java | 161 ++--- .../undertow/handlers/CamelMethodHandler.java | 13 +- .../undertow/handlers/CamelPathHandler.java | 63 +- .../handlers/CamelPathTemplateHandler.java | 53 +- .../undertow/handlers/CamelRootHandler.java | 145 +++-- .../undertow/handlers/CamelWebSocketHandler.java | 57 +- .../camel/dataformat/univocity/Marshaller.java | 8 +- .../camel/component/velocity/VelocityEndpoint.java | 75 +-- .../websocket/VertxWebsocketResultHandler.java | 14 +- .../org/apache/camel/component/wal/LogWriter.java | 8 +- .../java/org/apache/camel/wasm/WasmFunction.java | 11 +- .../camel/component/xchange/XChangeComponent.java | 6 +- .../apache/camel/component/xmpp/XmppEndpoint.java | 87 +-- .../component/xmpp/XmppGroupChatProducer.java | 44 +- .../component/xmpp/XmppPrivateChatProducer.java | 15 +- .../apache/camel/language/xpath/XPathBuilder.java | 134 ++-- .../apache/camel/component/xslt/XsltBuilder.java | 9 +- .../zendesk/internal/ZendeskPropertiesHelper.java | 17 +- 26 files changed, 1195 insertions(+), 770 deletions(-) diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java index c67805417bc..127acc092dc 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.grpc.client; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import io.grpc.stub.StreamObserver; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -33,6 +36,7 @@ class GrpcStreamingExchangeForwarder implements GrpcExchangeForwarder { private final Object grpcStub; + private final Lock lock = new ReentrantLock(); private volatile StreamObserver<Object> currentStream; private volatile StreamObserver<Object> currentResponseObserver; @@ -75,13 +79,16 @@ class GrpcStreamingExchangeForwarder implements GrpcExchangeForwarder { private StreamObserver<Object> checkAndRecreateStreamObserver(StreamObserver<Object> responseObserver) { StreamObserver<Object> curStream = this.currentStream; if (curStream == null) { - synchronized (this) { + lock.lock(); + try { if (this.currentStream == null) { this.currentResponseObserver = responseObserver; this.currentStream = doCreateStream(responseObserver); } curStream = this.currentStream; + } finally { + lock.unlock(); } } @@ -93,9 +100,12 @@ class GrpcStreamingExchangeForwarder implements GrpcExchangeForwarder { } private void doCloseStream() { - synchronized (this) { + lock.lock(); + try { this.currentStream = null; this.currentResponseObserver = null; + } finally { + lock.unlock(); } } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java index 73ed4ea0b62..ffed98cabe1 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java @@ -24,6 +24,8 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.component.mllp.MllpComponent; import org.apache.camel.component.mllp.MllpEndpoint; @@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory; public class MllpSocketBuffer { private static final Logger LOG = LoggerFactory.getLogger(MllpSocketBuffer.class); + private final Lock lock = new ReentrantLock(); final MllpEndpoint endpoint; byte[] buffer; @@ -45,10 +48,10 @@ public class MllpSocketBuffer { int startOfBlockIndex = -1; int endOfBlockIndex = -1; - String charset; - Hl7Util hl7Util; - int minBufferSize; - int maxBufferSize; + final String charset; + final Hl7Util hl7Util; + final int minBufferSize; + final int maxBufferSize; public MllpSocketBuffer(MllpEndpoint endpoint) { if (endpoint == null) { @@ -72,13 +75,18 @@ public class MllpSocketBuffer { return size() <= 0; } - public synchronized void write(int b) { - ensureCapacity(1); - buffer[availableByteCount] = (byte) b; + public void write(int b) { + lock.lock(); + try { + ensureCapacity(1); + buffer[availableByteCount] = (byte) b; - updateIndexes(b, 0); + updateIndexes(b, 0); - availableByteCount += 1; + availableByteCount += 1; + } finally { + lock.unlock(); + } } public void write(byte[] b) { @@ -87,222 +95,277 @@ public class MllpSocketBuffer { } } - public synchronized void write(byte[] sourceBytes, int offset, int writeCount) { - if (sourceBytes != null && sourceBytes.length > 0) { - if (offset < 0) { - throw new IndexOutOfBoundsException( - String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is less than zero", - sourceBytes.length, offset, writeCount)); - } - if (offset > sourceBytes.length) { - throw new IndexOutOfBoundsException( - String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is greater than write count", - sourceBytes.length, offset, writeCount)); - } + public void write(byte[] sourceBytes, int offset, int writeCount) { + lock.lock(); + try { + if (sourceBytes != null && sourceBytes.length > 0) { + if (offset < 0) { + throw new IndexOutOfBoundsException( + String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is less than zero", + sourceBytes.length, offset, writeCount)); + } + if (offset > sourceBytes.length) { + throw new IndexOutOfBoundsException( + String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is greater than write count", + sourceBytes.length, offset, writeCount)); + } - if (writeCount < 0) { - throw new IndexOutOfBoundsException( - String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is less than zero", - sourceBytes.length, offset, writeCount)); - } - if (writeCount > sourceBytes.length) { - throw new IndexOutOfBoundsException( - String.format( - "write(byte[%d], offset[%d], writeCount[%d]) - write count is greater than length of the source byte[]", - sourceBytes.length, offset, writeCount)); - } - if ((offset + writeCount) - sourceBytes.length > 0) { - throw new IndexOutOfBoundsException( - String.format( - "write(byte[%d], offset[%d], writeCount[%d]) - offset plus write count <%d> is greater than length of the source byte[]", - sourceBytes.length, offset, writeCount, offset + writeCount)); - } + if (writeCount < 0) { + throw new IndexOutOfBoundsException( + String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is less than zero", + sourceBytes.length, offset, writeCount)); + } + if (writeCount > sourceBytes.length) { + throw new IndexOutOfBoundsException( + String.format( + "write(byte[%d], offset[%d], writeCount[%d]) - write count is greater than length of the source byte[]", + sourceBytes.length, offset, writeCount)); + } + if ((offset + writeCount) - sourceBytes.length > 0) { + throw new IndexOutOfBoundsException( + String.format( + "write(byte[%d], offset[%d], writeCount[%d]) - offset plus write count <%d> is greater than length of the source byte[]", + sourceBytes.length, offset, writeCount, offset + writeCount)); + } - ensureCapacity(writeCount); - System.arraycopy(sourceBytes, offset, buffer, availableByteCount, writeCount); + ensureCapacity(writeCount); + System.arraycopy(sourceBytes, offset, buffer, availableByteCount, writeCount); - for (int i = offset; i < writeCount && (startOfBlockIndex < 0 || endOfBlockIndex < 0); ++i) { - updateIndexes(sourceBytes[i], i); - } + for (int i = offset; i < writeCount && (startOfBlockIndex < 0 || endOfBlockIndex < 0); ++i) { + updateIndexes(sourceBytes[i], i); + } - availableByteCount += writeCount; + availableByteCount += writeCount; + } + } finally { + lock.unlock(); } } - public synchronized void openMllpEnvelope() { - reset(); - write(MllpProtocolConstants.START_OF_BLOCK); + public void openMllpEnvelope() { + lock.lock(); + try { + reset(); + write(MllpProtocolConstants.START_OF_BLOCK); + } finally { + lock.unlock(); + } } - public synchronized void closeMllpEnvelope() { + public void closeMllpEnvelope() { write(MllpProtocolConstants.PAYLOAD_TERMINATOR); } - public synchronized void setEnvelopedMessage(byte[] hl7Payload) { + public void setEnvelopedMessage(byte[] hl7Payload) { setEnvelopedMessage(hl7Payload, 0, hl7Payload != null ? hl7Payload.length : 0); } - public synchronized void setEnvelopedMessage(byte[] hl7Payload, int offset, int length) { - reset(); + public void setEnvelopedMessage(byte[] hl7Payload, int offset, int length) { + lock.lock(); + try { + reset(); - if (hl7Payload != null && hl7Payload.length > 0) { - if (hl7Payload[0] != MllpProtocolConstants.START_OF_BLOCK) { - openMllpEnvelope(); - } + if (hl7Payload != null && hl7Payload.length > 0) { + if (hl7Payload[0] != MllpProtocolConstants.START_OF_BLOCK) { + openMllpEnvelope(); + } - write(hl7Payload, offset, length); + write(hl7Payload, offset, length); - if (!hasCompleteEnvelope()) { + if (!hasCompleteEnvelope()) { + closeMllpEnvelope(); + } + } else { + openMllpEnvelope(); closeMllpEnvelope(); } - } else { - openMllpEnvelope(); - closeMllpEnvelope(); + } finally { + lock.unlock(); } } - public synchronized void reset() { - if (availableByteCount > 0) { - // TODO: May be able to get rid of this - Arrays.fill(buffer, (byte) 0); - } + public void reset() { + lock.lock(); + try { + if (availableByteCount > 0) { + // TODO: May be able to get rid of this + Arrays.fill(buffer, (byte) 0); + } - availableByteCount = 0; + availableByteCount = 0; - startOfBlockIndex = -1; - endOfBlockIndex = -1; + startOfBlockIndex = -1; + endOfBlockIndex = -1; + } finally { + lock.unlock(); + } } - public synchronized void readFrom(Socket socket) throws MllpSocketException, SocketTimeoutException { + public void readFrom(Socket socket) throws MllpSocketException, SocketTimeoutException { readFrom(socket, endpoint.getConfiguration().getReceiveTimeout(), endpoint.getConfiguration().getReadTimeout()); } - public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) + public void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException { - if (socket != null && socket.isConnected() && !socket.isClosed()) { - LOG.trace("readFrom({}, {}, {}) - entering", socket, receiveTimeout, readTimeout); - ensureCapacity(minBufferSize); + lock.lock(); + try { + if (socket != null && socket.isConnected() && !socket.isClosed()) { + LOG.trace("readFrom({}, {}, {}) - entering", socket, receiveTimeout, readTimeout); + ensureCapacity(minBufferSize); - try { - InputStream socketInputStream = socket.getInputStream(); + try { + InputStream socketInputStream = socket.getInputStream(); - socket.setSoTimeout(receiveTimeout); + socket.setSoTimeout(receiveTimeout); - readSocketInputStream(socketInputStream, socket); - if (!hasCompleteEnvelope()) { - socket.setSoTimeout(readTimeout); + readSocketInputStream(socketInputStream, socket); + if (!hasCompleteEnvelope()) { + socket.setSoTimeout(readTimeout); - while (!hasCompleteEnvelope()) { - ensureCapacity(Math.max(minBufferSize, socketInputStream.available())); - readSocketInputStream(socketInputStream, socket); + while (!hasCompleteEnvelope()) { + ensureCapacity(Math.max(minBufferSize, socketInputStream.available())); + readSocketInputStream(socketInputStream, socket); + } } - } - } catch (SocketTimeoutException timeoutEx) { - throw timeoutEx; - } catch (IOException ioEx) { - final String exceptionMessage - = String.format("readFrom(%s, %d, %d) - IOException encountered", socket, receiveTimeout, readTimeout); - resetSocket(socket, exceptionMessage); - throw new MllpSocketException(exceptionMessage, ioEx); - } finally { - if (size() > 0 && !hasCompleteEnvelope()) { - if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) { - LOG.warn("readFrom({}, {}, {}) - exiting with partial payload {}", socket, receiveTimeout, readTimeout, - hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); + } catch (SocketTimeoutException timeoutEx) { + throw timeoutEx; + } catch (IOException ioEx) { + final String exceptionMessage + = String.format("readFrom(%s, %d, %d) - IOException encountered", socket, receiveTimeout, readTimeout); + resetSocket(socket, exceptionMessage); + throw new MllpSocketException(exceptionMessage, ioEx); + } finally { + if (size() > 0 && !hasCompleteEnvelope()) { + if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) { + LOG.warn("readFrom({}, {}, {}) - exiting with partial payload {}", socket, receiveTimeout, readTimeout, + hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); + } } } + + } else { + LOG.warn("readFrom({}, {}, {}) - no data read because Socket is invalid", socket, receiveTimeout, readTimeout); } - } else { - LOG.warn("readFrom({}, {}, {}) - no data read because Socket is invalid", socket, receiveTimeout, readTimeout); + LOG.trace("readFrom({}, {}, {}) - exiting", socket, receiveTimeout, readTimeout); + } finally { + lock.unlock(); } - - LOG.trace("readFrom({}, {}, {}) - exiting", socket, receiveTimeout, readTimeout); } - public synchronized void writeTo(Socket socket) throws MllpSocketException { - if (socket != null && socket.isConnected() && !socket.isClosed()) { - LOG.trace("writeTo({}) - entering", socket); - if (!isEmpty()) { - try { - OutputStream socketOutputStream = socket.getOutputStream(); - if (hasStartOfBlock()) { - if (hasEndOfData()) { - socketOutputStream.write(buffer, startOfBlockIndex, endOfBlockIndex - startOfBlockIndex + 2); - } else if (hasEndOfBlock()) { - socketOutputStream.write(buffer, startOfBlockIndex, endOfBlockIndex - startOfBlockIndex + 1); - socketOutputStream.write(MllpProtocolConstants.END_OF_DATA); + public void writeTo(Socket socket) throws MllpSocketException { + lock.lock(); + try { + if (socket != null && socket.isConnected() && !socket.isClosed()) { + LOG.trace("writeTo({}) - entering", socket); + if (!isEmpty()) { + try { + OutputStream socketOutputStream = socket.getOutputStream(); + if (hasStartOfBlock()) { + if (hasEndOfData()) { + socketOutputStream.write(buffer, startOfBlockIndex, endOfBlockIndex - startOfBlockIndex + 2); + } else if (hasEndOfBlock()) { + socketOutputStream.write(buffer, startOfBlockIndex, endOfBlockIndex - startOfBlockIndex + 1); + socketOutputStream.write(MllpProtocolConstants.END_OF_DATA); + } else { + socketOutputStream.write(buffer, startOfBlockIndex, availableByteCount - startOfBlockIndex); + socketOutputStream.write(MllpProtocolConstants.PAYLOAD_TERMINATOR); + } } else { - socketOutputStream.write(buffer, startOfBlockIndex, availableByteCount - startOfBlockIndex); + socketOutputStream.write(MllpProtocolConstants.START_OF_BLOCK); + socketOutputStream.write(buffer, 0, availableByteCount); socketOutputStream.write(MllpProtocolConstants.PAYLOAD_TERMINATOR); } - } else { - socketOutputStream.write(MllpProtocolConstants.START_OF_BLOCK); - socketOutputStream.write(buffer, 0, availableByteCount); - socketOutputStream.write(MllpProtocolConstants.PAYLOAD_TERMINATOR); + socketOutputStream.flush(); + } catch (IOException ioEx) { + final String exceptionMessage = String.format("writeTo(%s) - IOException encountered", socket); + resetSocket(socket, exceptionMessage); + throw new MllpSocketException(exceptionMessage, ioEx); } - socketOutputStream.flush(); - } catch (IOException ioEx) { - final String exceptionMessage = String.format("writeTo(%s) - IOException encountered", socket); - resetSocket(socket, exceptionMessage); - throw new MllpSocketException(exceptionMessage, ioEx); + } else { + LOG.warn("writeTo({}) - no data written because buffer is empty", socket); } } else { - LOG.warn("writeTo({}) - no data written because buffer is empty", socket); + LOG.warn("writeTo({}) - no data written because Socket is invalid", socket); } - } else { - LOG.warn("writeTo({}) - no data written because Socket is invalid", socket); - } - LOG.trace("writeTo({}) - exiting", socket); + LOG.trace("writeTo({}) - exiting", socket); + } finally { + lock.unlock(); + } } - public synchronized byte[] toByteArray() { - if (availableByteCount > 0) { - return Arrays.copyOf(buffer, availableByteCount); - } + public byte[] toByteArray() { + lock.lock(); + try { + if (availableByteCount > 0) { + return Arrays.copyOf(buffer, availableByteCount); + } - return null; + return null; + } finally { + lock.unlock(); + } } - public synchronized byte[] toByteArrayAndReset() { - byte[] answer = toByteArray(); + public byte[] toByteArrayAndReset() { + lock.lock(); + try { + byte[] answer = toByteArray(); - reset(); + reset(); - return answer; + return answer; + } finally { + lock.unlock(); + } } @Override - public synchronized String toString() { - if (charset != null) { - return toString(charset); - } else { - return toString(endpoint.getComponent().getDefaultCharset()); + public String toString() { + lock.lock(); + try { + if (charset != null) { + return toString(charset); + } else { + return toString(endpoint.getComponent().getDefaultCharset()); + } + } finally { + lock.unlock(); } } - public synchronized String toString(Charset charset) { - if (availableByteCount > 0) { - return new String(buffer, 0, availableByteCount, charset); - } + public String toString(Charset charset) { + lock.lock(); + try { + if (availableByteCount > 0) { + return new String(buffer, 0, availableByteCount, charset); + } - return ""; + return ""; + } finally { + lock.unlock(); + } } - public synchronized String toString(String charsetName) { - if (availableByteCount > 0) { - try { - if (Charset.isSupported(charsetName)) { - return toString(Charset.forName(charsetName)); + public String toString(String charsetName) { + lock.lock(); + try { + if (availableByteCount > 0) { + try { + if (Charset.isSupported(charsetName)) { + return toString(Charset.forName(charsetName)); + } + } catch (Exception charsetEx) { + // ignore } - } catch (Exception charsetEx) { - // ignore } - } - return ""; + return ""; + } finally { + lock.unlock(); + } } /** @@ -311,12 +374,17 @@ public class MllpSocketBuffer { * * @return print-friendly String */ - public synchronized String toPrintFriendlyString() { - if (availableByteCount > 0) { - return hl7Util.convertToPrintFriendlyString(buffer, 0, availableByteCount); - } + public String toPrintFriendlyString() { + lock.lock(); + try { + if (availableByteCount > 0) { + return hl7Util.convertToPrintFriendlyString(buffer, 0, availableByteCount); + } - return ""; + return ""; + } finally { + lock.unlock(); + } } public String toPrintFriendlyStringAndReset() { @@ -327,40 +395,50 @@ public class MllpSocketBuffer { return answer; } - public synchronized String toHl7String() { + public String toHl7String() { return this.toHl7String(charset); } - public synchronized String toHl7String(String charsetName) { - if (charsetName != null && !charsetName.isEmpty()) { - try { - if (Charset.isSupported(charsetName)) { - return toHl7String(Charset.forName(charsetName)); + public String toHl7String(String charsetName) { + lock.lock(); + try { + if (charsetName != null && !charsetName.isEmpty()) { + try { + if (Charset.isSupported(charsetName)) { + return toHl7String(Charset.forName(charsetName)); + } + } catch (Exception charsetEx) { + // ignore } - } catch (Exception charsetEx) { - // ignore } - } - if (Charset.isSupported(endpoint.getComponent().getDefaultCharset())) { - return toHl7String(endpoint.getComponent().getDefaultCharset()); - } + if (Charset.isSupported(endpoint.getComponent().getDefaultCharset())) { + return toHl7String(endpoint.getComponent().getDefaultCharset()); + } - return ""; + return ""; + } finally { + lock.unlock(); + } } - public synchronized String toHl7String(Charset charset) { - if (hasCompleteEnvelope()) { - int offset = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; - int length = hasEndOfBlock() ? endOfBlockIndex - offset : availableByteCount - startOfBlockIndex - 1; - if (length > 0) { - return new String(buffer, offset, length, charset); - } else { - return ""; + public String toHl7String(Charset charset) { + lock.lock(); + try { + if (hasCompleteEnvelope()) { + int offset = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; + int length = hasEndOfBlock() ? endOfBlockIndex - offset : availableByteCount - startOfBlockIndex - 1; + if (length > 0) { + return new String(buffer, offset, length, charset); + } else { + return ""; + } } - } - return null; + return null; + } finally { + lock.unlock(); + } } /** @@ -369,138 +447,207 @@ public class MllpSocketBuffer { * * @return print-friendly String */ - public synchronized String toPrintFriendlyHl7String() { - if (hasCompleteEnvelope()) { - int startPosition = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; - int endPosition = hasEndOfBlock() ? endOfBlockIndex : availableByteCount - 1; - return hl7Util.convertToPrintFriendlyString(buffer, startPosition, endPosition); - } + public String toPrintFriendlyHl7String() { + lock.lock(); + try { + if (hasCompleteEnvelope()) { + int startPosition = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; + int endPosition = hasEndOfBlock() ? endOfBlockIndex : availableByteCount - 1; + return hl7Util.convertToPrintFriendlyString(buffer, startPosition, endPosition); + } - return ""; + return ""; + } finally { + lock.unlock(); + } } - public synchronized byte[] toMllpPayload() { - byte[] mllpPayload = null; + public byte[] toMllpPayload() { + lock.lock(); + try { + byte[] mllpPayload = null; - if (hasCompleteEnvelope()) { - int offset = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; - int length = hasEndOfBlock() ? endOfBlockIndex - offset : availableByteCount - startOfBlockIndex - 1; + if (hasCompleteEnvelope()) { + int offset = hasStartOfBlock() ? startOfBlockIndex + 1 : 1; + int length = hasEndOfBlock() ? endOfBlockIndex - offset : availableByteCount - startOfBlockIndex - 1; - if (length > 0) { - mllpPayload = new byte[length]; - System.arraycopy(buffer, offset, mllpPayload, 0, length); - } else { - mllpPayload = new byte[0]; + if (length > 0) { + mllpPayload = new byte[length]; + System.arraycopy(buffer, offset, mllpPayload, 0, length); + } else { + mllpPayload = new byte[0]; + } } - } - return mllpPayload; + return mllpPayload; + } finally { + lock.unlock(); + } } - public synchronized int getStartOfBlockIndex() { - return startOfBlockIndex; + public int getStartOfBlockIndex() { + lock.lock(); + try { + return startOfBlockIndex; + } finally { + lock.unlock(); + } } - public synchronized int getEndOfBlockIndex() { - return endOfBlockIndex; + public int getEndOfBlockIndex() { + lock.lock(); + try { + return endOfBlockIndex; + } finally { + lock.unlock(); + } } - public synchronized boolean hasCompleteEnvelope() { - if (hasStartOfBlock()) { - if (isEndOfDataRequired()) { - return hasEndOfData(); - } else { - return hasEndOfBlock(); + public boolean hasCompleteEnvelope() { + lock.lock(); + try { + if (hasStartOfBlock()) { + if (isEndOfDataRequired()) { + return hasEndOfData(); + } else { + return hasEndOfBlock(); + } } - } - return false; + return false; + } finally { + lock.unlock(); + } } - public synchronized boolean hasStartOfBlock() { - return startOfBlockIndex >= 0; + public boolean hasStartOfBlock() { + lock.lock(); + try { + return startOfBlockIndex >= 0; + } finally { + lock.unlock(); + } } - public synchronized boolean hasEndOfBlock() { - return endOfBlockIndex >= 0; + public boolean hasEndOfBlock() { + lock.lock(); + try { + return endOfBlockIndex >= 0; + } finally { + lock.unlock(); + } } - public synchronized boolean hasEndOfData() { - if (hasEndOfBlock()) { - int potentialEndOfDataIndex = endOfBlockIndex + 1; - if (potentialEndOfDataIndex < availableByteCount - && buffer[potentialEndOfDataIndex] == MllpProtocolConstants.END_OF_DATA) { - return true; + public boolean hasEndOfData() { + lock.lock(); + try { + if (hasEndOfBlock()) { + int potentialEndOfDataIndex = endOfBlockIndex + 1; + if (potentialEndOfDataIndex < availableByteCount + && buffer[potentialEndOfDataIndex] == MllpProtocolConstants.END_OF_DATA) { + return true; + } } - } - return false; + return false; + } finally { + lock.unlock(); + } } - public synchronized boolean hasOutOfBandData() { - return hasLeadingOutOfBandData() || hasTrailingOutOfBandData(); + public boolean hasOutOfBandData() { + lock.lock(); + try { + return hasLeadingOutOfBandData() || hasTrailingOutOfBandData(); + } finally { + lock.unlock(); + } } - public synchronized boolean hasLeadingOutOfBandData() { - if (size() > 0) { - if (!hasStartOfBlock() || startOfBlockIndex > 0) { - return true; + public boolean hasLeadingOutOfBandData() { + lock.lock(); + try { + if (size() > 0) { + return !hasStartOfBlock() || startOfBlockIndex > 0; } - } - return false; + return false; + } finally { + lock.unlock(); + } } - public synchronized boolean hasTrailingOutOfBandData() { - if (size() > 0) { - if (hasEndOfData()) { - if (endOfBlockIndex + 1 < size() - 1) { - return true; - } - } else if (!isEndOfDataRequired()) { - if (hasEndOfBlock() && endOfBlockIndex < size() - 1) { - return true; + public boolean hasTrailingOutOfBandData() { + lock.lock(); + try { + if (size() > 0) { + if (hasEndOfData()) { + return endOfBlockIndex + 1 < size() - 1; + } else if (!isEndOfDataRequired()) { + return hasEndOfBlock() && endOfBlockIndex < size() - 1; } } - } - return false; + return false; + } finally { + lock.unlock(); + } } - public synchronized byte[] getLeadingOutOfBandData() { - byte[] outOfBandData = null; + public byte[] getLeadingOutOfBandData() { + lock.lock(); + try { + byte[] outOfBandData = null; - if (hasLeadingOutOfBandData()) { - outOfBandData = new byte[startOfBlockIndex == -1 ? availableByteCount : startOfBlockIndex]; - System.arraycopy(buffer, 0, outOfBandData, 0, outOfBandData.length); - } + if (hasLeadingOutOfBandData()) { + outOfBandData = new byte[startOfBlockIndex == -1 ? availableByteCount : startOfBlockIndex]; + System.arraycopy(buffer, 0, outOfBandData, 0, outOfBandData.length); + } - return outOfBandData; + return outOfBandData; + } finally { + lock.unlock(); + } } - public synchronized byte[] getTrailingOutOfBandData() { - byte[] outOfBandData = null; + public byte[] getTrailingOutOfBandData() { + lock.lock(); + try { + byte[] outOfBandData = null; - if (hasTrailingOutOfBandData()) { - int offset = hasEndOfData() ? endOfBlockIndex + 2 : endOfBlockIndex + 1; - int length = size() - offset; - outOfBandData = new byte[length]; - System.arraycopy(buffer, offset, outOfBandData, 0, length); - } + if (hasTrailingOutOfBandData()) { + int offset = hasEndOfData() ? endOfBlockIndex + 2 : endOfBlockIndex + 1; + int length = size() - offset; + outOfBandData = new byte[length]; + System.arraycopy(buffer, offset, outOfBandData, 0, length); + } - return outOfBandData; + return outOfBandData; + } finally { + lock.unlock(); + } } - public synchronized int size() { - return availableByteCount; + public int size() { + lock.lock(); + try { + return availableByteCount; + } finally { + lock.unlock(); + } } - public synchronized int capacity() { - if (buffer != null) { - return buffer.length - availableByteCount; - } + public int capacity() { + lock.lock(); + try { + if (buffer != null) { + return buffer.length - availableByteCount; + } - return -1; + return -1; + } finally { + lock.unlock(); + } } void ensureCapacity(int requiredAvailableCapacity) { diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java index 813dbbe6a85..8f26afa052a 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -41,6 +43,7 @@ public class AsyncInputStream implements ReadStream<Buffer> { private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class); private static final int DEFAULT_BUFFER_SIZE = 4096; + private final Lock lock = new ReentrantLock(); private final ReadableByteChannel channel; private final Vertx vertx; private final Context context; @@ -68,43 +71,68 @@ public class AsyncInputStream implements ReadStream<Buffer> { } @Override - public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) { - checkStreamClosed(); - this.endHandler = endHandler; - return this; + public AsyncInputStream endHandler(Handler<Void> endHandler) { + lock.lock(); + try { + checkStreamClosed(); + this.endHandler = endHandler; + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) { - checkStreamClosed(); - this.exceptionHandler = exceptionHandler; - return this; + public AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) { + lock.lock(); + try { + checkStreamClosed(); + this.exceptionHandler = exceptionHandler; + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized AsyncInputStream handler(Handler<Buffer> handler) { - checkStreamClosed(); - this.dataHandler = handler; - if (this.dataHandler != null && !this.closed) { - this.doRead(); - } else { - queue.clear(); + public AsyncInputStream handler(Handler<Buffer> handler) { + lock.lock(); + try { + checkStreamClosed(); + this.dataHandler = handler; + if (this.dataHandler != null && !this.closed) { + this.doRead(); + } else { + queue.clear(); + } + return this; + } finally { + lock.unlock(); } - return this; } @Override - public synchronized AsyncInputStream pause() { - checkStreamClosed(); - queue.pause(); - return this; + public AsyncInputStream pause() { + lock.lock(); + try { + checkStreamClosed(); + queue.pause(); + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized AsyncInputStream resume() { - checkStreamClosed(); - queue.resume(); - return this; + public AsyncInputStream resume() { + lock.lock(); + try { + checkStreamClosed(); + queue.resume(); + return this; + } finally { + lock.unlock(); + } } @Override @@ -133,9 +161,14 @@ public class AsyncInputStream implements ReadStream<Buffer> { } } - private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) { - closed = true; - doClose(handler); + private void closeInternal(Handler<AsyncResult<Void>> handler) { + lock.lock(); + try { + closed = true; + doClose(handler); + } finally { + lock.unlock(); + } } private void doClose(Handler<AsyncResult<Void>> handler) { @@ -156,22 +189,27 @@ public class AsyncInputStream implements ReadStream<Buffer> { doRead(ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)); } - private synchronized void doRead(ByteBuffer buffer) { - if (!readInProgress) { - readInProgress = true; - Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE); - doRead(buff, 0, buffer, readPos, result -> { - if (result.succeeded()) { - readInProgress = false; - Buffer updatedBuffer = result.result(); - readPos += updatedBuffer.length(); - if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) { - doRead(buffer); + private void doRead(ByteBuffer buffer) { + lock.lock(); + try { + if (!readInProgress) { + readInProgress = true; + Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE); + doRead(buff, 0, buffer, readPos, result -> { + if (result.succeeded()) { + readInProgress = false; + Buffer updatedBuffer = result.result(); + readPos += updatedBuffer.length(); + if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) { + doRead(buffer); + } + } else { + handleException(result.cause()); } - } else { - handleException(result.cause()); - } - }); + }); + } + } finally { + lock.unlock(); } } @@ -210,8 +248,11 @@ public class AsyncInputStream implements ReadStream<Buffer> { private void handleData(Buffer buffer) { Handler<Buffer> handler; - synchronized (this) { + lock.lock(); + try { handler = this.dataHandler; + } finally { + lock.unlock(); } if (handler != null) { checkContext(); @@ -219,11 +260,14 @@ public class AsyncInputStream implements ReadStream<Buffer> { } } - private synchronized void handleEnd() { + private void handleEnd() { Handler<Void> endHandler; - synchronized (this) { + lock.lock(); + try { dataHandler = null; endHandler = this.endHandler; + } finally { + lock.unlock(); } if (endHandler != null) { checkContext(); diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java index 98553e9a23a..452b59caf76 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java @@ -17,6 +17,9 @@ package org.apache.camel.component.sjms.reply; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.Destination; import jakarta.jms.ExceptionListener; @@ -110,8 +113,10 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { } private final class TemporaryReplyQueueDestinationResolver implements DestinationCreationStrategy { + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); private TemporaryQueue queue; - private final AtomicBoolean refreshWanted = new AtomicBoolean(); + private volatile boolean refreshWanted; @Override public Destination createDestination(Session session, String name, boolean topic) throws JMSException { @@ -120,32 +125,38 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override public Destination createTemporaryDestination(Session session, boolean topic) throws JMSException { - synchronized (refreshWanted) { - if (queue == null || refreshWanted.get()) { - refreshWanted.set(false); + lock.lock(); + try { + if (queue == null || refreshWanted) { + refreshWanted = false; queue = session.createTemporaryQueue(); setReplyTo(queue); if (log.isDebugEnabled()) { log.debug("Refreshed Temporary ReplyTo Queue. New queue: {}", queue.getQueueName()); } - refreshWanted.notifyAll(); + condition.signalAll(); } + } finally { + lock.unlock(); } return queue; } public void scheduleRefresh() { - refreshWanted.set(true); + refreshWanted = true; } public void destinationReady() throws InterruptedException { - if (refreshWanted.get()) { - synchronized (refreshWanted) { + if (refreshWanted) { + lock.lock(); + try { //check if requestWanted is still true - if (refreshWanted.get()) { + if (refreshWanted) { log.debug("Waiting for new Temporary ReplyTo queue to be assigned before we can continue"); - refreshWanted.wait(); + condition.await(); } + } finally { + lock.unlock(); } } } diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppSplitter.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppSplitter.java index 36fbbf488e4..57068ed9900 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppSplitter.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppSplitter.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.smpp; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,15 +75,16 @@ public class SmppSplitter { protected static final int MAX_SEG_COUNT = 255; private static final Logger LOG = LoggerFactory.getLogger(SmppSplitter.class); + private static final Lock LOCK = new ReentrantLock(); /** * Current reference number. */ private static int refNum; - private int messageLength; - private int segmentLength; - private int currentLength; + private final int messageLength; + private final int segmentLength; + private final int currentLength; protected SmppSplitter(int messageLength, int segmentLength, int currentLength) { this.messageLength = messageLength; @@ -93,23 +97,38 @@ public class SmppSplitter { * * @return the reference number of the multipart message */ - protected static synchronized byte getReferenceNumber() { - refNum++; - if (refNum == 256) { - refNum = 1; + protected static byte getReferenceNumber() { + LOCK.lock(); + try { + refNum++; + if (refNum == 256) { + refNum = 1; + } + return (byte) refNum; + } finally { + LOCK.unlock(); } - return (byte) refNum; } - protected static synchronized byte getCurrentReferenceNumber() { - return (byte) refNum; + protected static byte getCurrentReferenceNumber() { + LOCK.lock(); + try { + return (byte) refNum; + } finally { + LOCK.unlock(); + } } /** * only needed for the unit tests */ - protected static synchronized void resetCurrentReferenceNumber() { - SmppSplitter.refNum = 0; + protected static void resetCurrentReferenceNumber() { + LOCK.lock(); + try { + SmppSplitter.refNum = 0; + } finally { + LOCK.unlock(); + } } public byte[][] split(byte[] message) { diff --git a/components/camel-thymeleaf/src/main/java/org/apache/camel/component/thymeleaf/ThymeleafEndpoint.java b/components/camel-thymeleaf/src/main/java/org/apache/camel/component/thymeleaf/ThymeleafEndpoint.java index 31bb060b0e9..7e5a2c7074e 100644 --- a/components/camel-thymeleaf/src/main/java/org/apache/camel/component/thymeleaf/ThymeleafEndpoint.java +++ b/components/camel-thymeleaf/src/main/java/org/apache/camel/component/thymeleaf/ThymeleafEndpoint.java @@ -241,39 +241,44 @@ public class ThymeleafEndpoint extends ResourceEndpoint { this.cacheable = cacheable; } - protected synchronized TemplateEngine getTemplateEngine() { - if (templateEngine == null) { - ITemplateResolver templateResolver; - - switch (resolver) { - case CLASS_LOADER -> { - templateResolver = classLoaderTemplateResolver(); - } - case DEFAULT -> { - templateResolver = defaultTemplateResolver(); - } - case FILE -> { - templateResolver = fileTemplateResolver(); - } - case STRING -> { - templateResolver = stringTemplateResolver(); - } - case URL -> { - templateResolver = urlTemplateResolver(); - } - case WEB_APP -> { - templateResolver = webApplicationTemplateResolver(); - } - default -> { - throw new RuntimeCamelException("cannot determine TemplateResolver for type " + resolver); + protected TemplateEngine getTemplateEngine() { + getInternalLock().lock(); + try { + if (templateEngine == null) { + ITemplateResolver templateResolver; + + switch (resolver) { + case CLASS_LOADER -> { + templateResolver = classLoaderTemplateResolver(); + } + case DEFAULT -> { + templateResolver = defaultTemplateResolver(); + } + case FILE -> { + templateResolver = fileTemplateResolver(); + } + case STRING -> { + templateResolver = stringTemplateResolver(); + } + case URL -> { + templateResolver = urlTemplateResolver(); + } + case WEB_APP -> { + templateResolver = webApplicationTemplateResolver(); + } + default -> { + throw new RuntimeCamelException("cannot determine TemplateResolver for type " + resolver); + } } + + templateEngine = new TemplateEngine(); + templateEngine.setTemplateResolver(templateResolver); } - templateEngine = new TemplateEngine(); - templateEngine.setTemplateResolver(templateResolver); + return templateEngine; + } finally { + getInternalLock().unlock(); } - - return templateEngine; } /** diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerComponent.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerComponent.java index 254d1177e0e..24ccf109f65 100644 --- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerComponent.java +++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerComponent.java @@ -19,9 +19,9 @@ package org.apache.camel.component.timer; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; @@ -38,8 +38,7 @@ import org.apache.camel.support.DefaultComponent; */ @org.apache.camel.spi.annotations.Component("timer") public class TimerComponent extends DefaultComponent { - private final Map<String, Timer> timers = new HashMap<>(); - private final Map<String, AtomicInteger> refCounts = new HashMap<>(); + private final Map<String, TimerHolder> timers = new ConcurrentHashMap<>(); @Metadata private boolean includeMetadata; @@ -66,26 +65,16 @@ public class TimerComponent extends DefaultComponent { key = "nonDaemon:" + key; } - Timer answer; - synchronized (timers) { - answer = timers.get(key); - if (answer == null) { + return timers.compute(key, (k, v) -> { + if (v == null) { // the timer name is also the thread name, so lets resolve a name to be used String name = consumer.getEndpoint().getCamelContext().getExecutorServiceManager() .resolveThreadName("timer://" + consumer.getEndpoint().getTimerName()); - answer = new Timer(name, consumer.getEndpoint().isDaemon()); - timers.put(key, answer); - // store new reference counter - refCounts.put(key, new AtomicInteger(1)); - } else { - // increase reference counter - AtomicInteger counter = refCounts.get(key); - if (counter != null) { - counter.incrementAndGet(); - } + return new TimerHolder(new Timer(name, consumer.getEndpoint().isDaemon())); } - } - return answer; + v.refCount.incrementAndGet(); + return v; + }).timer; } public void removeTimer(TimerConsumer consumer) { @@ -93,19 +82,13 @@ public class TimerComponent extends DefaultComponent { if (!consumer.getEndpoint().isDaemon()) { key = "nonDaemon:" + key; } - - synchronized (timers) { - // decrease reference counter - AtomicInteger counter = refCounts.get(key); - if (counter != null && counter.decrementAndGet() <= 0) { - refCounts.remove(key); - // remove timer as its no longer in use - Timer timer = timers.remove(key); - if (timer != null) { - timer.cancel(); - } + timers.computeIfPresent(key, (k, v) -> { + if (v.refCount.decrementAndGet() == 0) { + v.timer.cancel(); + return null; } - } + return v; + }); } @Override @@ -136,11 +119,20 @@ public class TimerComponent extends DefaultComponent { @Override protected void doStop() throws Exception { - Collection<Timer> collection = timers.values(); - for (Timer timer : collection) { - timer.cancel(); + Collection<TimerHolder> collection = timers.values(); + for (TimerHolder holder : collection) { + holder.timer.cancel(); } timers.clear(); - refCounts.clear(); + } + + private static class TimerHolder { + private final Timer timer; + private final AtomicInteger refCount; + + private TimerHolder(Timer timer) { + this.timer = timer; + this.refCount = new AtomicInteger(1); + } } } diff --git a/components/camel-twilio/src/main/java/org/apache/camel/component/twilio/internal/TwilioPropertiesHelper.java b/components/camel-twilio/src/main/java/org/apache/camel/component/twilio/internal/TwilioPropertiesHelper.java index 8dff6d6ffab..6d44d274c98 100644 --- a/components/camel-twilio/src/main/java/org/apache/camel/component/twilio/internal/TwilioPropertiesHelper.java +++ b/components/camel-twilio/src/main/java/org/apache/camel/component/twilio/internal/TwilioPropertiesHelper.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.twilio.internal; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.camel.CamelContext; import org.apache.camel.component.twilio.TwilioConfiguration; import org.apache.camel.support.component.ApiMethodPropertiesHelper; @@ -25,16 +28,22 @@ import org.apache.camel.support.component.ApiMethodPropertiesHelper; */ public final class TwilioPropertiesHelper extends ApiMethodPropertiesHelper<TwilioConfiguration> { + private static final Lock LOCK = new ReentrantLock(); private static TwilioPropertiesHelper helper; private TwilioPropertiesHelper(CamelContext context) { super(context, TwilioConfiguration.class, TwilioConstants.PROPERTY_PREFIX); } - public static synchronized TwilioPropertiesHelper getHelper(CamelContext context) { - if (helper == null) { - helper = new TwilioPropertiesHelper(context); + public static TwilioPropertiesHelper getHelper(CamelContext context) { + LOCK.lock(); + try { + if (helper == null) { + helper = new TwilioPropertiesHelper(context); + } + return helper; + } finally { + LOCK.unlock(); } - return helper; } } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java index 5ecf217fd58..f5c900e2ab6 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java @@ -17,6 +17,8 @@ package org.apache.camel.component.undertow; import java.net.URI; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.servlet.ServletException; @@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory; public class DefaultUndertowHost implements UndertowHost { private static final Logger LOG = LoggerFactory.getLogger(DefaultUndertowHost.class); + private final Lock lock = new ReentrantLock(); private final UndertowHostKey key; private final UndertowHostOptions options; private final CamelRootHandler rootHandler; @@ -65,68 +68,73 @@ public class DefaultUndertowHost implements UndertowHost { } @Override - public synchronized HttpHandler registerHandler( + public HttpHandler registerHandler( UndertowConsumer consumer, HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler) { - if (undertow == null) { - Undertow.Builder builder = Undertow.builder(); - if (key.getSslContext() != null) { - builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext()); - } else { - builder.addHttpListener(key.getPort(), key.getHost()); - } - - if (options != null) { - if (options.getIoThreads() != null) { - builder.setIoThreads(options.getIoThreads()); - } - if (options.getWorkerThreads() != null) { - builder.setWorkerThreads(options.getWorkerThreads()); + lock.lock(); + try { + if (undertow == null) { + Undertow.Builder builder = Undertow.builder(); + if (key.getSslContext() != null) { + builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext()); + } else { + builder.addHttpListener(key.getPort(), key.getHost()); } - if (options.getBufferSize() != null) { - builder.setBufferSize(options.getBufferSize()); + + if (options != null) { + if (options.getIoThreads() != null) { + builder.setIoThreads(options.getIoThreads()); + } + if (options.getWorkerThreads() != null) { + builder.setWorkerThreads(options.getWorkerThreads()); + } + if (options.getBufferSize() != null) { + builder.setBufferSize(options.getBufferSize()); + } + if (options.getDirectBuffers() != null) { + builder.setDirectBuffers(options.getDirectBuffers()); + } + if (options.getHttp2Enabled() != null) { + builder.setServerOption(UndertowOptions.ENABLE_HTTP2, options.getHttp2Enabled()); + } } - if (options.getDirectBuffers() != null) { - builder.setDirectBuffers(options.getDirectBuffers()); + + if (consumer != null && consumer.isRest()) { + // use the rest handler as its a rest consumer + undertow = registerHandler(consumer, builder, restHandler); + } else { + undertow = registerHandler(consumer, builder, rootHandler); } - if (options.getHttp2Enabled() != null) { - builder.setServerOption(UndertowOptions.ENABLE_HTTP2, options.getHttp2Enabled()); + LOG.info("Starting Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), + key.getPort()); + + try { + // If there is an exception while starting up, Undertow wraps it + // as RuntimeException which leaves the consumer in an inconsistent + // state as a subsequent start if the route (i.e. manually) won't + // start the Undertow instance as undertow is not null. + undertow.start(); + } catch (RuntimeException e) { + LOG.warn("Failed to start Undertow server on {}://{}:{}, reason: {}", + key.getSslContext() != null ? "https" : "http", key.getHost(), key.getPort(), e.getMessage()); + + // Cleanup any resource that may have been created during start + // and reset the instance so a subsequent start will trigger the + // initialization again. + undertow.stop(); + undertow = null; + + throw e; } } - if (consumer != null && consumer.isRest()) { - // use the rest handler as its a rest consumer - undertow = registerHandler(consumer, builder, restHandler); + restHandler.addConsumer(consumer); + return restHandler; } else { - undertow = registerHandler(consumer, builder, rootHandler); - } - LOG.info("Starting Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), - key.getPort()); - - try { - // If there is an exception while starting up, Undertow wraps it - // as RuntimeException which leaves the consumer in an inconsistent - // state as a subsequent start if the route (i.e. manually) won't - // start the Undertow instance as undertow is not null. - undertow.start(); - } catch (RuntimeException e) { - LOG.warn("Failed to start Undertow server on {}://{}:{}, reason: {}", - key.getSslContext() != null ? "https" : "http", key.getHost(), key.getPort(), e.getMessage()); - - // Cleanup any resource that may have been created during start - // and reset the instance so a subsequent start will trigger the - // initialization again. - undertow.stop(); - undertow = null; - - throw e; + return rootHandler.add(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), + registrationInfo.isMatchOnUriPrefix(), handler); } - } - if (consumer != null && consumer.isRest()) { - restHandler.addConsumer(consumer); - return restHandler; - } else { - return rootHandler.add(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), - registrationInfo.isMatchOnUriPrefix(), handler); + } finally { + lock.unlock(); } } @@ -163,29 +171,34 @@ public class DefaultUndertowHost implements UndertowHost { } @Override - public synchronized void unregisterHandler(UndertowConsumer consumer, HttpHandlerRegistrationInfo registrationInfo) { - if (undertow == null) { - return; - } + public void unregisterHandler(UndertowConsumer consumer, HttpHandlerRegistrationInfo registrationInfo) { + lock.lock(); + try { + if (undertow == null) { + return; + } - boolean stop; - if (consumer != null && consumer.isRest()) { - restHandler.removeConsumer(consumer); - stop = restHandler.consumers() <= 0; - } else { - rootHandler.remove(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), - registrationInfo.isMatchOnUriPrefix()); - stop = rootHandler.isEmpty(); - } - if (deploymentManager != null) { - deploymentManager.undeploy(); - } + boolean stop; + if (consumer != null && consumer.isRest()) { + restHandler.removeConsumer(consumer); + stop = restHandler.consumers() <= 0; + } else { + rootHandler.remove(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), + registrationInfo.isMatchOnUriPrefix()); + stop = rootHandler.isEmpty(); + } + if (deploymentManager != null) { + deploymentManager.undeploy(); + } - if (stop) { - LOG.info("Stopping Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), - key.getPort()); - undertow.stop(); - undertow = null; + if (stop) { + LOG.info("Stopping Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), + key.getPort()); + undertow.stop(); + undertow = null; + } + } finally { + lock.unlock(); } } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java index 9f995a33142..90ac4bac0fa 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java @@ -18,6 +18,8 @@ package org.apache.camel.component.undertow.handlers; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; @@ -38,6 +40,7 @@ public class CamelMethodHandler implements HttpHandler { DEFAULT_METHODS = new String[] { DEFAULT_HANDLER_KEY }; } + private final Lock methodMapLock = new ReentrantLock(); private final Map<String, MethodEntry> methodMap = new ConcurrentHashMap<>(); private String handlerString; @@ -66,11 +69,14 @@ public class CamelMethodHandler implements HttpHandler { public HttpHandler add(String methods, HttpHandler handler) { HttpHandler result = null; - synchronized (methodMap) { // we lock on methodMap to get a reliable sum of refCounts in remove(String) + methodMapLock.lock(); // we lock to get a reliable sum of refCounts in remove(String) + try { for (String method : splitMethods(methods)) { MethodEntry en = methodMap.computeIfAbsent(method, m -> new MethodEntry()); result = en.addRef(handler, method); } + } finally { + methodMapLock.unlock(); } handlerString = null; return result; @@ -78,7 +84,8 @@ public class CamelMethodHandler implements HttpHandler { public boolean remove(String methods) { boolean result; - synchronized (methodMap) { // we lock on methodMap to get a reliable sum of refCounts + methodMapLock.lock(); // we lock to get a reliable sum of refCounts + try { for (String method : splitMethods(methods)) { final MethodEntry en = methodMap.get(method); if (en != null) { @@ -86,6 +93,8 @@ public class CamelMethodHandler implements HttpHandler { } } result = methodMap.values().stream().mapToInt(en -> en.refCount).sum() == 0; + } finally { + methodMapLock.unlock(); } handlerString = null; return result; diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathHandler.java index 52c9d4c71fe..ee96bac375d 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathHandler.java @@ -18,6 +18,8 @@ package org.apache.camel.component.undertow.handlers; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.undertow.server.HttpHandler; import io.undertow.server.handlers.PathHandler; @@ -26,6 +28,7 @@ import io.undertow.server.handlers.PathHandler; * Extended PathHandler to monitor add/remove handlers. */ public class CamelPathHandler extends PathHandler { + private final Lock lock = new ReentrantLock(); private final Map<String, HttpHandler> handlers = new HashMap<>(); private String handlerString; @@ -34,35 +37,55 @@ public class CamelPathHandler extends PathHandler { } @Override - public synchronized PathHandler addPrefixPath(final String path, final HttpHandler handler) { - super.addPrefixPath(path, handler); - handlers.put(path, handler); - handlerString = null; - return this; + public PathHandler addPrefixPath(final String path, final HttpHandler handler) { + lock.lock(); + try { + super.addPrefixPath(path, handler); + handlers.put(path, handler); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized PathHandler addExactPath(final String path, final HttpHandler handler) { - super.addExactPath(path, handler); - handlers.put(path, handler); - handlerString = null; - return this; + public PathHandler addExactPath(final String path, final HttpHandler handler) { + lock.lock(); + try { + super.addExactPath(path, handler); + handlers.put(path, handler); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized PathHandler removePrefixPath(final String path) { - super.removePrefixPath(path); - handlers.remove(path); - handlerString = null; - return this; + public PathHandler removePrefixPath(final String path) { + lock.lock(); + try { + super.removePrefixPath(path); + handlers.remove(path); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } @Override - public synchronized PathHandler removeExactPath(final String path) { - super.removeExactPath(path); - handlers.remove(path); - handlerString = null; - return this; + public PathHandler removeExactPath(final String path) { + lock.lock(); + try { + super.removeExactPath(path); + handlers.remove(path); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } public HttpHandler getHandler(String path) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathTemplateHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathTemplateHandler.java index cf0b8d40e49..83f79e22ae8 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathTemplateHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelPathTemplateHandler.java @@ -18,6 +18,8 @@ package org.apache.camel.component.undertow.handlers; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; @@ -27,6 +29,7 @@ import io.undertow.server.handlers.PathTemplateHandler; * Extended PathTemplateHandler to monitor add/remove handlers. Also this enables hot swapping a default handler. */ public class CamelPathTemplateHandler implements HttpHandler { + private final Lock lock = new ReentrantLock(); private final Map<String, CamelMethodHandler> handlers = new HashMap<>(); private final Wrapper defaultHandlerWrapper = new Wrapper(); private final PathTemplateHandler delegate; @@ -42,18 +45,28 @@ public class CamelPathTemplateHandler implements HttpHandler { delegate.handleRequest(exchange); } - public synchronized CamelPathTemplateHandler add(final String uriTemplate, final CamelMethodHandler handler) { - delegate.add(uriTemplate, handler); - handlers.put(uriTemplate, handler); - handlerString = null; - return this; + public CamelPathTemplateHandler add(final String uriTemplate, final CamelMethodHandler handler) { + lock.lock(); + try { + delegate.add(uriTemplate, handler); + handlers.put(uriTemplate, handler); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } - public synchronized CamelPathTemplateHandler remove(final String uriTemplate) { - delegate.remove(uriTemplate); - handlers.remove(uriTemplate); - handlerString = null; - return this; + public CamelPathTemplateHandler remove(final String uriTemplate) { + lock.lock(); + try { + delegate.remove(uriTemplate); + handlers.remove(uriTemplate); + handlerString = null; + return this; + } finally { + lock.unlock(); + } } public CamelMethodHandler get(final String uriTemplate) { @@ -64,13 +77,23 @@ public class CamelPathTemplateHandler implements HttpHandler { return handlers.isEmpty(); } - public synchronized CamelMethodHandler getDefault() { - return this.defaultHandlerWrapper.get(); + public CamelMethodHandler getDefault() { + lock.lock(); + try { + return this.defaultHandlerWrapper.get(); + } finally { + lock.unlock(); + } } - public synchronized void setDefault(final CamelMethodHandler defaultHandler) { - this.defaultHandlerWrapper.set(defaultHandler); - handlerString = null; + public void setDefault(final CamelMethodHandler defaultHandler) { + lock.lock(); + try { + this.defaultHandlerWrapper.set(defaultHandler); + handlerString = null; + } finally { + lock.unlock(); + } } @Override diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java index 96b0bd54cc6..80e43d33662 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.undertow.handlers; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; import io.undertow.util.PathTemplate; @@ -27,6 +30,7 @@ import io.undertow.util.URLUtils; * @see RestRootHandler */ public class CamelRootHandler implements HttpHandler { + private final Lock lock = new ReentrantLock(); private final CamelPathHandler pathHandler; public CamelRootHandler(HttpHandler defaultHandler) { @@ -38,50 +42,55 @@ public class CamelRootHandler implements HttpHandler { pathHandler.handleRequest(exchange); } - public synchronized HttpHandler add(String path, String methods, boolean prefixMatch, HttpHandler handler) { - String basePath = getBasePath(path); - HttpHandler basePathHandler = pathHandler.getHandler(basePath); - - CamelMethodHandler targetHandler; - if (path.contains("{")) { - // Adding a handler for the template path - String relativePath = path.substring(basePath.length()); - if (basePathHandler instanceof CamelPathTemplateHandler) { - CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; - targetHandler = templateHandler.get(relativePath); - if (targetHandler == null) { - targetHandler = new CamelMethodHandler(); - templateHandler.add(relativePath, targetHandler); - } - } else { - targetHandler = add(basePathHandler, relativePath, basePath); - } + public HttpHandler add(String path, String methods, boolean prefixMatch, HttpHandler handler) { + lock.lock(); + try { + String basePath = getBasePath(path); + HttpHandler basePathHandler = pathHandler.getHandler(basePath); - } else { - // Adding a handler for the static path - if (basePathHandler instanceof CamelPathTemplateHandler) { - CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; - if (!prefixMatch) { - targetHandler = templateHandler.getDefault(); + CamelMethodHandler targetHandler; + if (path.contains("{")) { + // Adding a handler for the template path + String relativePath = path.substring(basePath.length()); + if (basePathHandler instanceof CamelPathTemplateHandler) { + CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; + targetHandler = templateHandler.get(relativePath); + if (targetHandler == null) { + targetHandler = new CamelMethodHandler(); + templateHandler.add(relativePath, targetHandler); + } } else { - throw new IllegalArgumentException(String.format("Duplicate handlers on a path '%s'", path)); + targetHandler = add(basePathHandler, relativePath, basePath); } + } else { - if (basePathHandler instanceof CamelMethodHandler) { - targetHandler = (CamelMethodHandler) basePathHandler; - } else if (basePathHandler == null) { - targetHandler = new CamelMethodHandler(); - if (prefixMatch) { - pathHandler.addPrefixPath(basePath, targetHandler); + // Adding a handler for the static path + if (basePathHandler instanceof CamelPathTemplateHandler) { + CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; + if (!prefixMatch) { + targetHandler = templateHandler.getDefault(); } else { - pathHandler.addExactPath(basePath, targetHandler); + throw new IllegalArgumentException(String.format("Duplicate handlers on a path '%s'", path)); } } else { - throw new IllegalArgumentException(String.format("Unsupported handler '%s' was found", basePathHandler)); + if (basePathHandler instanceof CamelMethodHandler) { + targetHandler = (CamelMethodHandler) basePathHandler; + } else if (basePathHandler == null) { + targetHandler = new CamelMethodHandler(); + if (prefixMatch) { + pathHandler.addPrefixPath(basePath, targetHandler); + } else { + pathHandler.addExactPath(basePath, targetHandler); + } + } else { + throw new IllegalArgumentException(String.format("Unsupported handler '%s' was found", basePathHandler)); + } } } + return targetHandler.add(methods, handler); + } finally { + lock.unlock(); } - return targetHandler.add(methods, handler); } private CamelMethodHandler add(HttpHandler basePathHandler, String relativePath, String basePath) { @@ -101,52 +110,62 @@ public class CamelRootHandler implements HttpHandler { return targetHandler; } - public synchronized void remove(String path, String methods, boolean prefixMatch) { - String basePath = getBasePath(path); - HttpHandler basePathHandler = pathHandler.getHandler(basePath); - if (basePathHandler == null) { - return; - } - - if (path.contains("{")) { - // Removing a handler for the template path - String relativePath = path.substring(basePath.length()); - CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; - CamelMethodHandler targetHandler = templateHandler.get(relativePath); - if (targetHandler.remove(methods)) { - templateHandler.remove(relativePath); - if (templateHandler.isEmpty()) { - pathHandler.removePrefixPath(basePath); - } + public void remove(String path, String methods, boolean prefixMatch) { + lock.lock(); + try { + String basePath = getBasePath(path); + HttpHandler basePathHandler = pathHandler.getHandler(basePath); + if (basePathHandler == null) { + return; } - } else { - // Removing a handler for the static path - if (basePathHandler instanceof CamelPathTemplateHandler) { + if (path.contains("{")) { + // Removing a handler for the template path String relativePath = path.substring(basePath.length()); CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; - CamelMethodHandler targetHandler = templateHandler.getDefault(); + CamelMethodHandler targetHandler = templateHandler.get(relativePath); if (targetHandler.remove(methods)) { templateHandler.remove(relativePath); if (templateHandler.isEmpty()) { pathHandler.removePrefixPath(basePath); } } + } else { - CamelMethodHandler targetHandler = (CamelMethodHandler) basePathHandler; - if (targetHandler.remove(methods)) { - if (prefixMatch) { - pathHandler.removePrefixPath(basePath); - } else { - pathHandler.removeExactPath(basePath); + // Removing a handler for the static path + if (basePathHandler instanceof CamelPathTemplateHandler) { + String relativePath = path.substring(basePath.length()); + CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; + CamelMethodHandler targetHandler = templateHandler.getDefault(); + if (targetHandler.remove(methods)) { + templateHandler.remove(relativePath); + if (templateHandler.isEmpty()) { + pathHandler.removePrefixPath(basePath); + } + } + } else { + CamelMethodHandler targetHandler = (CamelMethodHandler) basePathHandler; + if (targetHandler.remove(methods)) { + if (prefixMatch) { + pathHandler.removePrefixPath(basePath); + } else { + pathHandler.removeExactPath(basePath); + } } } } + } finally { + lock.unlock(); } } - public synchronized boolean isEmpty() { - return pathHandler.isEmpty(); + public boolean isEmpty() { + lock.lock(); + try { + return pathHandler.isEmpty(); + } finally { + lock.unlock(); + } } @Override diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java index abeab7b5407..00feeac6a64 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -69,7 +71,7 @@ public class CamelWebSocketHandler implements HttpHandler { private UndertowConsumer consumer; - private final Object consumerLock = new Object(); + private final Lock consumerLock = new ReentrantLock(); private final WebSocketProtocolHandshakeHandler delegate; @@ -167,28 +169,32 @@ public class CamelWebSocketHandler implements HttpHandler { * @param consumer the {@link UndertowConsumer} to set */ public void setConsumer(UndertowConsumer consumer) { - synchronized (consumerLock) { + consumerLock.lock(); + try { if (consumer != null && this.consumer != null) { throw new IllegalStateException( "Cannot call " + getClass().getName() + ".setConsumer(UndertowConsumer) with a non-null consumer before unsetting it via setConsumer(null)"); } this.consumer = consumer; + } finally { + consumerLock.unlock(); } } void sendEventNotificationIfNeeded( String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) { - synchronized (consumerLock) { - synchronized (consumerLock) { - if (consumer != null) { - if (consumer.getEndpoint().isFireWebSocketChannelEvents()) { - consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType); - } - } else { - LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType); + consumerLock.lock(); + try { + if (consumer != null) { + if (consumer.getEndpoint().isFireWebSocketChannelEvents()) { + consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType); } + } else { + LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType); } + } finally { + consumerLock.unlock(); } } @@ -200,7 +206,7 @@ public class CamelWebSocketHandler implements HttpHandler { private final Exchange camelExchange; private Map<String, Throwable> errors; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); /** * Initially, this set contains all peers where we plan to send the message. Then the peers are removed one by * one as we are notified via {@link #complete(WebSocketChannel, Void)} or @@ -212,28 +218,32 @@ public class CamelWebSocketHandler implements HttpHandler { public MultiCallback(Collection<WebSocketChannel> peers, AsyncCallback camelCallback, Exchange camelExchange) { this.camelCallback = camelCallback; this.camelExchange = camelExchange; - synchronized (lock) { - this.peers = new HashSet<>(peers); - } + this.peers = new HashSet<>(peers); } @Override public void closedBeforeSent(WebSocketChannel channel) { - synchronized (lock) { + lock.lock(); + try { peers.remove(channel); if (peers.isEmpty()) { finish(); } + } finally { + lock.unlock(); } } @Override public void complete(WebSocketChannel channel, Void context) { - synchronized (lock) { + lock.lock(); + try { peers.remove(channel); if (peers.isEmpty()) { finish(); } + } finally { + lock.unlock(); } } @@ -261,7 +271,8 @@ public class CamelWebSocketHandler implements HttpHandler { @Override public void onError(WebSocketChannel channel, Void context, Throwable throwable) { - synchronized (lock) { + lock.lock(); + try { peers.remove(channel); final String connectionKey = (String) channel.getAttribute(UndertowConstants.CONNECTION_KEY); if (connectionKey == null) { @@ -276,6 +287,8 @@ public class CamelWebSocketHandler implements HttpHandler { if (peers.isEmpty()) { finish(); } + } finally { + lock.unlock(); } } @@ -310,13 +323,16 @@ public class CamelWebSocketHandler implements HttpHandler { buffer.get(bytes, offset, increment); offset += increment; } - synchronized (consumerLock) { + consumerLock.lock(); + try { if (consumer != null) { final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes; consumer.sendMessage(connectionKey, channel, outMsg); } else { LOG.debug("No consumer to handle message received: {}", message); } + } finally { + consumerLock.unlock(); } } finally { data.free(); @@ -333,13 +349,16 @@ public class CamelWebSocketHandler implements HttpHandler { UndertowConstants.CONNECTION_KEY + " attribute not found on " + WebSocketChannel.class.getSimpleName() + " " + channel); } - synchronized (consumerLock) { + consumerLock.lock(); + try { if (consumer != null) { final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text; consumer.sendMessage(connectionKey, channel, outMsg); } else { LOG.debug("No consumer to handle message received: {}", message); } + } finally { + consumerLock.unlock(); } } diff --git a/components/camel-univocity-parsers/src/main/java/org/apache/camel/dataformat/univocity/Marshaller.java b/components/camel-univocity-parsers/src/main/java/org/apache/camel/dataformat/univocity/Marshaller.java index e23452f5da7..9ab3719070d 100644 --- a/components/camel-univocity-parsers/src/main/java/org/apache/camel/dataformat/univocity/Marshaller.java +++ b/components/camel-univocity-parsers/src/main/java/org/apache/camel/dataformat/univocity/Marshaller.java @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.univocity.parsers.common.AbstractWriter; import org.apache.camel.Exchange; @@ -35,6 +37,7 @@ import static org.apache.camel.support.ExchangeHelper.convertToType; * @param <W> Writer class */ final class Marshaller<W extends AbstractWriter<?>> { + private final Lock headersLock = new ReentrantLock(); private final LinkedHashSet<String> headers = new LinkedHashSet<>(); private final boolean adaptHeaders; @@ -85,11 +88,14 @@ final class Marshaller<W extends AbstractWriter<?>> { private void writeRow(Exchange exchange, Object row, W writer) throws NoTypeConversionAvailableException { Map<?, ?> map = convertToMandatoryType(exchange, Map.class, row); if (adaptHeaders) { - synchronized (headers) { + headersLock.lock(); + try { for (Object key : map.keySet()) { headers.add(convertToMandatoryType(exchange, String.class, key)); } writeRow(map, writer); + } finally { + headersLock.unlock(); } } else { writeRow(map, writer); diff --git a/components/camel-velocity/src/main/java/org/apache/camel/component/velocity/VelocityEndpoint.java b/components/camel-velocity/src/main/java/org/apache/camel/component/velocity/VelocityEndpoint.java index 3c5745b526a..425ef61f76a 100644 --- a/components/camel-velocity/src/main/java/org/apache/camel/component/velocity/VelocityEndpoint.java +++ b/components/camel-velocity/src/main/java/org/apache/camel/component/velocity/VelocityEndpoint.java @@ -82,46 +82,51 @@ public class VelocityEndpoint extends ResourceEndpoint { return "velocity:" + getResourceUri(); } - private synchronized VelocityEngine getVelocityEngine() throws Exception { - if (velocityEngine == null) { - velocityEngine = new VelocityEngine(); - - // set the class resolver as a property so we can access it from CamelVelocityClasspathResourceLoader - velocityEngine.addProperty("CamelClassResolver", getCamelContext().getClassResolver()); - - // set default properties - Properties properties = new Properties(); - properties.setProperty(RuntimeConstants.FILE_RESOURCE_LOADER_CACHE, isLoaderCache() ? "true" : "false"); - properties.setProperty(RuntimeConstants.RESOURCE_LOADERS, "file, class"); - properties.setProperty("resource.loader.class.description", "Camel Velocity Classpath Resource Loader"); - properties.setProperty("resource.loader.class.class", CamelVelocityClasspathResourceLoader.class.getName()); - final Logger velocityLogger = LoggerFactory.getLogger("org.apache.camel.maven.Velocity"); - properties.setProperty(RuntimeConstants.RUNTIME_LOG_NAME, velocityLogger.getName()); - - // load the velocity properties from property file which may overrides the default ones - if (ObjectHelper.isNotEmpty(getPropertiesFile())) { - InputStream reader - = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), getPropertiesFile()); + private VelocityEngine getVelocityEngine() throws Exception { + getInternalLock().lock(); + try { + if (velocityEngine == null) { + velocityEngine = new VelocityEngine(); + + // set the class resolver as a property so we can access it from CamelVelocityClasspathResourceLoader + velocityEngine.addProperty("CamelClassResolver", getCamelContext().getClassResolver()); + + // set default properties + Properties properties = new Properties(); + properties.setProperty(RuntimeConstants.FILE_RESOURCE_LOADER_CACHE, isLoaderCache() ? "true" : "false"); + properties.setProperty(RuntimeConstants.RESOURCE_LOADERS, "file, class"); + properties.setProperty("resource.loader.class.description", "Camel Velocity Classpath Resource Loader"); + properties.setProperty("resource.loader.class.class", CamelVelocityClasspathResourceLoader.class.getName()); + final Logger velocityLogger = LoggerFactory.getLogger("org.apache.camel.maven.Velocity"); + properties.setProperty(RuntimeConstants.RUNTIME_LOG_NAME, velocityLogger.getName()); + + // load the velocity properties from property file which may overrides the default ones + if (ObjectHelper.isNotEmpty(getPropertiesFile())) { + InputStream reader + = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), getPropertiesFile()); + try { + properties.load(reader); + log.info("Loaded the velocity configuration file {}", getPropertiesFile()); + } finally { + IOHelper.close(reader, getPropertiesFile(), log); + } + } + + log.debug("Initializing VelocityEngine with properties {}", properties); + // help the velocityEngine to load the CamelVelocityClasspathResourceLoader + ClassLoader old = Thread.currentThread().getContextClassLoader(); try { - properties.load(reader); - log.info("Loaded the velocity configuration file {}", getPropertiesFile()); + ClassLoader delegate = new CamelVelocityDelegateClassLoader(old); + Thread.currentThread().setContextClassLoader(delegate); + velocityEngine.init(properties); } finally { - IOHelper.close(reader, getPropertiesFile(), log); + Thread.currentThread().setContextClassLoader(old); } } - - log.debug("Initializing VelocityEngine with properties {}", properties); - // help the velocityEngine to load the CamelVelocityClasspathResourceLoader - ClassLoader old = Thread.currentThread().getContextClassLoader(); - try { - ClassLoader delegate = new CamelVelocityDelegateClassLoader(old); - Thread.currentThread().setContextClassLoader(delegate); - velocityEngine.init(properties); - } finally { - Thread.currentThread().setContextClassLoader(old); - } + return velocityEngine; + } finally { + getInternalLock().unlock(); } - return velocityEngine; } public void setVelocityEngine(VelocityEngine velocityEngine) { diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketResultHandler.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketResultHandler.java index 3daecca36e7..c5c6115daac 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketResultHandler.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketResultHandler.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExecutionException; @@ -33,7 +35,7 @@ class VertxWebsocketResultHandler { private final AsyncCallback callback; private final Set<String> connectionKeys; private final Map<String, Throwable> errors = new HashMap<>(); - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); VertxWebsocketResultHandler(Exchange exchange, AsyncCallback callback, Set<String> connectionKeys) { this.exchange = exchange; @@ -42,17 +44,23 @@ class VertxWebsocketResultHandler { } void onResult(String connectionKey) { - synchronized (lock) { + lock.lock(); + try { connectionKeys.remove(connectionKey); if (connectionKeys.isEmpty()) { onComplete(); } + } finally { + lock.unlock(); } } void onError(String connectionKey, Throwable cause) { - synchronized (lock) { + lock.lock(); + try { errors.put(connectionKey, cause); + } finally { + lock.unlock(); } } diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java index 74b450c3f88..b3133c50dc1 100644 --- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java +++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.camel.RuntimeCamelException; @@ -39,6 +41,7 @@ public final class LogWriter implements AutoCloseable { public static final int DEFAULT_CAPACITY = 1024 * 512; private static final Logger LOG = LoggerFactory.getLogger(LogWriter.class); + private final Lock lock = new ReentrantLock(); private final FileChannel fileChannel; private final LogSupervisor flushPolicy; @@ -89,12 +92,15 @@ public final class LogWriter implements AutoCloseable { fileChannel.force(true); } - private synchronized void tryFlush() { + private void tryFlush() { + lock.lock(); try { flush(); } catch (IOException e) { LOG.error("Unable to save record: {}", e.getMessage(), e); throw new RuntimeException(e); + } finally { + lock.unlock(); } } diff --git a/components/camel-wasm/src/main/java/org/apache/camel/wasm/WasmFunction.java b/components/camel-wasm/src/main/java/org/apache/camel/wasm/WasmFunction.java index 5d6db59ded3..fc2acb4b139 100644 --- a/components/camel-wasm/src/main/java/org/apache/camel/wasm/WasmFunction.java +++ b/components/camel-wasm/src/main/java/org/apache/camel/wasm/WasmFunction.java @@ -17,13 +17,15 @@ package org.apache.camel.wasm; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.dylibso.chicory.runtime.ExportFunction; import com.dylibso.chicory.runtime.Instance; import com.dylibso.chicory.wasm.WasmModule; public class WasmFunction implements AutoCloseable { - private final Object lock; + private final Lock lock; private final WasmModule module; private final String functionName; @@ -34,7 +36,7 @@ public class WasmFunction implements AutoCloseable { private final ExportFunction dealloc; public WasmFunction(WasmModule module, String functionName) { - this.lock = new Object(); + this.lock = new ReentrantLock(); this.module = Objects.requireNonNull(module); this.functionName = Objects.requireNonNull(functionName); @@ -57,7 +59,8 @@ public class WasmFunction implements AutoCloseable { // Wasm execution is not thread safe so we must put a // synchronization guard around the function execution // - synchronized (lock) { + lock.lock(); + try { try { inPtr = (int) alloc.apply(inSize)[0]; instance.memory().write(inPtr, in); @@ -86,6 +89,8 @@ public class WasmFunction implements AutoCloseable { dealloc.apply(outPtr, outSize); } } + } finally { + lock.unlock(); } } diff --git a/components/camel-xchange/src/main/java/org/apache/camel/component/xchange/XChangeComponent.java b/components/camel-xchange/src/main/java/org/apache/camel/component/xchange/XChangeComponent.java index f8b4f2200da..b08e58f2d12 100644 --- a/components/camel-xchange/src/main/java/org/apache/camel/component/xchange/XChangeComponent.java +++ b/components/camel-xchange/src/main/java/org/apache/camel/component/xchange/XChangeComponent.java @@ -16,8 +16,8 @@ */ package org.apache.camel.component.xchange; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.camel.Endpoint; import org.apache.camel.spi.annotations.Component; @@ -29,7 +29,7 @@ import org.knowm.xchange.utils.Assert; @Component("xchange") public class XChangeComponent extends DefaultComponent { - private final Map<String, XChange> xchanges = new HashMap<>(); + private final Map<String, XChange> xchanges = new ConcurrentHashMap<>(); @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { @@ -64,7 +64,7 @@ public class XChangeComponent extends DefaultComponent { return ExchangeFactory.INSTANCE.createExchange(exchangeClass); } - private synchronized XChange getOrCreateXChange(String name) { + private XChange getOrCreateXChange(String name) { return xchanges.computeIfAbsent(name, xc -> { Class<? extends Exchange> exchangeClass = XChangeHelper.loadXChangeClass(getCamelContext(), name); Assert.notNull(exchangeClass, "XChange not supported: " + name); diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java index 0c39c2723f9..804ef4e1c3c 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java @@ -178,59 +178,64 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg return "xmpp://" + host + ":" + port + "/" + getParticipant() + "?serviceName=" + serviceName; } - public synchronized XMPPTCPConnection createConnection() + public XMPPTCPConnection createConnection() throws InterruptedException, IOException, SmackException, XMPPException { - if (connection != null && connection.isConnected()) { - // use existing working connection - return connection; - } + lock.lock(); + try { + if (connection != null && connection.isConnected()) { + // use existing working connection + return connection; + } - // prepare for creating new connection - connection = null; + // prepare for creating new connection + connection = null; - LOG.trace("Creating new connection ..."); - XMPPTCPConnection newConnection = createConnectionInternal(); + LOG.trace("Creating new connection ..."); + XMPPTCPConnection newConnection = createConnectionInternal(); - newConnection.connect(); + newConnection.connect(); - newConnection.addSyncStanzaListener(new XmppLogger("INBOUND"), stanza -> true); - newConnection.addSyncStanzaListener(new XmppLogger("OUTBOUND"), stanza -> true); + newConnection.addSyncStanzaListener(new XmppLogger("INBOUND"), stanza -> true); + newConnection.addSyncStanzaListener(new XmppLogger("OUTBOUND"), stanza -> true); - if (!newConnection.isAuthenticated()) { - if (user != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Logging in to XMPP as user: {} on connection: {}", user, getConnectionMessage(newConnection)); - } - if (password == null) { - LOG.warn("No password configured for user: {} on connection: {}", user, - getConnectionMessage(newConnection)); - } + if (!newConnection.isAuthenticated()) { + if (user != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Logging in to XMPP as user: {} on connection: {}", user, getConnectionMessage(newConnection)); + } + if (password == null) { + LOG.warn("No password configured for user: {} on connection: {}", user, + getConnectionMessage(newConnection)); + } - if (createAccount) { - AccountManager accountManager = AccountManager.getInstance(newConnection); - accountManager.createAccount(Localpart.from(user), password); - } - if (login) { - if (resource != null) { - newConnection.login(user, password, Resourcepart.from(resource)); - } else { - newConnection.login(user, password); + if (createAccount) { + AccountManager accountManager = AccountManager.getInstance(newConnection); + accountManager.createAccount(Localpart.from(user), password); } + if (login) { + if (resource != null) { + newConnection.login(user, password, Resourcepart.from(resource)); + } else { + newConnection.login(user, password); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Logging in anonymously to XMPP on connection: {}", getConnectionMessage(newConnection)); + } + newConnection.login(); } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Logging in anonymously to XMPP on connection: {}", getConnectionMessage(newConnection)); - } - newConnection.login(); + + // presence is not needed to be sent after login } - // presence is not needed to be sent after login + // okay new connection was created successfully so assign it as the connection + LOG.debug("Created new connection successfully: {}", newConnection); + connection = newConnection; + return connection; + } finally { + lock.unlock(); } - - // okay new connection was created successfully so assign it as the connection - LOG.debug("Created new connection successfully: {}", newConnection); - connection = newConnection; - return connection; } private XMPPTCPConnection createConnectionInternal() throws UnknownHostException, XmppStringprepException { diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java index c6ad065fe2c..ab9c4552ac5 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java @@ -98,12 +98,17 @@ public class XmppGroupChatProducer extends DefaultProducer { } } - private synchronized void reconnect() throws InterruptedException, IOException, SmackException, XMPPException { - if (!connection.isConnected()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection)); + private void reconnect() throws InterruptedException, IOException, SmackException, XMPPException { + lock.lock(); + try { + if (!connection.isConnected()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection)); + } + connection.connect(); } - connection.connect(); + } finally { + lock.unlock(); } } @@ -130,20 +135,25 @@ public class XmppGroupChatProducer extends DefaultProducer { super.doStart(); } - protected synchronized void initializeChat() + protected void initializeChat() throws InterruptedException, SmackException, XMPPException, XmppStringprepException { - if (chat == null) { - room = endpoint.resolveRoom(connection); - String roomPassword = endpoint.getRoomPassword(); - MultiUserChatManager chatManager = MultiUserChatManager.getInstanceFor(connection); - chat = chatManager.getMultiUserChat(JidCreate.entityBareFrom(room)); - MucEnterConfiguration.Builder mucc = chat.getEnterConfigurationBuilder(Resourcepart.from(endpoint.getNickname())) - .requestNoHistory(); - if (roomPassword != null) { - mucc.withPassword(roomPassword); + lock.lock(); + try { + if (chat == null) { + room = endpoint.resolveRoom(connection); + String roomPassword = endpoint.getRoomPassword(); + MultiUserChatManager chatManager = MultiUserChatManager.getInstanceFor(connection); + chat = chatManager.getMultiUserChat(JidCreate.entityBareFrom(room)); + MucEnterConfiguration.Builder mucc = chat.getEnterConfigurationBuilder(Resourcepart.from(endpoint.getNickname())) + .requestNoHistory(); + if (roomPassword != null) { + mucc.withPassword(roomPassword); + } + chat.join(mucc.build()); + LOG.info("Joined room: {} as: {}", room, endpoint.getNickname()); } - chat.join(mucc.build()); - LOG.info("Joined room: {} as: {}", room, endpoint.getNickname()); + } finally { + lock.unlock(); } } diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java index bcd36d4f6d2..fee149fcd52 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java @@ -110,12 +110,17 @@ public class XmppPrivateChatProducer extends DefaultProducer { return chatManager.chatWith(JidCreate.entityBareFrom(participant)); } - private synchronized void reconnect() throws InterruptedException, IOException, SmackException, XMPPException { - if (!connection.isConnected()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection)); + private void reconnect() throws InterruptedException, IOException, SmackException, XMPPException { + lock.lock(); + try { + if (!connection.isConnected()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection)); + } + connection.connect(); } - connection.connect(); + } finally { + lock.unlock(); } } diff --git a/components/camel-xpath/src/main/java/org/apache/camel/language/xpath/XPathBuilder.java b/components/camel-xpath/src/main/java/org/apache/camel/language/xpath/XPathBuilder.java index 8dbc7087b32..5562d372565 100644 --- a/components/camel-xpath/src/main/java/org/apache/camel/language/xpath/XPathBuilder.java +++ b/components/camel-xpath/src/main/java/org/apache/camel/language/xpath/XPathBuilder.java @@ -26,6 +26,8 @@ import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.xml.namespace.QName; import javax.xml.parsers.ParserConfigurationException; @@ -99,6 +101,7 @@ public class XPathBuilder extends ServiceSupport private static volatile XPathFactory defaultXPathFactory; private CamelContext camelContext; + private final Lock lock = new ReentrantLock(); private final Queue<XPathExpression> pool = new ConcurrentLinkedQueue<>(); private final Queue<XPathExpression> poolLogNamespaces = new ConcurrentLinkedQueue<>(); private final String text; @@ -1073,39 +1076,49 @@ public class XPathBuilder extends ServiceSupport * This implementation must be synchronized to ensure thread safety, as this XPathBuilder instance may not have been * started prior to being used. */ - protected synchronized XPathExpression createXPathExpression() + protected XPathExpression createXPathExpression() throws XPathExpressionException { - // ensure we are started + lock.lock(); try { - start(); - } catch (Exception e) { - throw new RuntimeExpressionException("Error starting XPathBuilder", e); - } + // ensure we are started + try { + start(); + } catch (Exception e) { + throw new RuntimeExpressionException("Error starting XPathBuilder", e); + } - // XPathFactory is not thread safe - XPath xPath = getXPathFactory().newXPath(); + // XPathFactory is not thread safe + XPath xPath = getXPathFactory().newXPath(); - if (!logNamespaces && LOG.isTraceEnabled()) { - LOG.trace("Creating new XPath expression in pool. Namespaces on XPath expression: {}", getNamespaceContext()); - } else if (logNamespaces && LOG.isInfoEnabled()) { - LOG.info("Creating new XPath expression in pool. Namespaces on XPath expression: {}", getNamespaceContext()); - } - xPath.setNamespaceContext(getNamespaceContext()); - xPath.setXPathVariableResolver(getVariableResolver()); + if (!logNamespaces && LOG.isTraceEnabled()) { + LOG.trace("Creating new XPath expression in pool. Namespaces on XPath expression: {}", getNamespaceContext()); + } else if (logNamespaces && LOG.isInfoEnabled()) { + LOG.info("Creating new XPath expression in pool. Namespaces on XPath expression: {}", getNamespaceContext()); + } + xPath.setNamespaceContext(getNamespaceContext()); + xPath.setXPathVariableResolver(getVariableResolver()); - XPathFunctionResolver parentResolver = getFunctionResolver(); - if (parentResolver == null) { - parentResolver = xPath.getXPathFunctionResolver(); + XPathFunctionResolver parentResolver = getFunctionResolver(); + if (parentResolver == null) { + parentResolver = xPath.getXPathFunctionResolver(); + } + xPath.setXPathFunctionResolver(createDefaultFunctionResolver(parentResolver)); + return xPath.compile(text); + } finally { + lock.unlock(); } - xPath.setXPathFunctionResolver(createDefaultFunctionResolver(parentResolver)); - return xPath.compile(text); } - protected synchronized XPathExpression createTraceNamespaceExpression() + protected XPathExpression createTraceNamespaceExpression() throws XPathExpressionException { - // XPathFactory is not thread safe - XPath xPath = getXPathFactory().newXPath(); - return xPath.compile(OBTAIN_ALL_NS_XPATH); + lock.lock(); + try { + // XPathFactory is not thread safe + XPath xPath = getXPathFactory().newXPath(); + return xPath.compile(OBTAIN_ALL_NS_XPATH); + } finally { + lock.unlock(); + } } protected DefaultNamespaceContext createNamespaceContext(XPathFactory factory) { @@ -1307,47 +1320,52 @@ public class XPathBuilder extends ServiceSupport poolLogNamespaces.clear(); } - protected synchronized XPathFactory createXPathFactory() throws XPathFactoryConfigurationException { - if (objectModelUri != null) { - String xpathFactoryClassName = factoryClassName; - if (objectModelUri.equals(SAXON_OBJECT_MODEL_URI) - && (xpathFactoryClassName == null || SAXON_FACTORY_CLASS_NAME.equals(xpathFactoryClassName))) { - // from Saxon 9.7 onwards you should favour to create the class - // directly - // https://www.saxonica.com/html/documentation/xpath-api/jaxp-xpath/factory.html - try { - if (camelContext != null) { - Class<XPathFactory> clazz - = camelContext.getClassResolver().resolveClass(SAXON_FACTORY_CLASS_NAME, XPathFactory.class); - if (clazz != null) { - LOG.debug("Creating Saxon XPathFactory using class: {})", clazz); - xpathFactory = camelContext.getInjector().newInstance(clazz); - LOG.info("Created Saxon XPathFactory: {}", xpathFactory); + protected XPathFactory createXPathFactory() throws XPathFactoryConfigurationException { + lock.lock(); + try { + if (objectModelUri != null) { + String xpathFactoryClassName = factoryClassName; + if (objectModelUri.equals(SAXON_OBJECT_MODEL_URI) + && (xpathFactoryClassName == null || SAXON_FACTORY_CLASS_NAME.equals(xpathFactoryClassName))) { + // from Saxon 9.7 onwards you should favour to create the class + // directly + // https://www.saxonica.com/html/documentation/xpath-api/jaxp-xpath/factory.html + try { + if (camelContext != null) { + Class<XPathFactory> clazz + = camelContext.getClassResolver().resolveClass(SAXON_FACTORY_CLASS_NAME, XPathFactory.class); + if (clazz != null) { + LOG.debug("Creating Saxon XPathFactory using class: {})", clazz); + xpathFactory = camelContext.getInjector().newInstance(clazz); + LOG.info("Created Saxon XPathFactory: {}", xpathFactory); + } } + } catch (Exception e) { + LOG.warn("Attempted to create Saxon XPathFactory by creating a new instance of {}" + + " failed. Will fallback and create XPathFactory using JDK API. This exception is ignored (stacktrace in DEBUG logging level).", + SAXON_FACTORY_CLASS_NAME); + LOG.debug("Error creating Saxon XPathFactory. This exception is ignored.", e); } - } catch (Exception e) { - LOG.warn("Attempted to create Saxon XPathFactory by creating a new instance of {}" + - " failed. Will fallback and create XPathFactory using JDK API. This exception is ignored (stacktrace in DEBUG logging level).", - SAXON_FACTORY_CLASS_NAME); - LOG.debug("Error creating Saxon XPathFactory. This exception is ignored.", e); } - } - if (xpathFactory == null) { - LOG.debug("Creating XPathFactory from objectModelUri: {}", objectModelUri); - xpathFactory = ObjectHelper.isEmpty(xpathFactoryClassName) - ? XPathFactory.newInstance(objectModelUri) - : XPathFactory.newInstance(objectModelUri, xpathFactoryClassName, null); - LOG.info("Created XPathFactory: {} from objectModelUri: {}", xpathFactory, objectModelUri); - } + if (xpathFactory == null) { + LOG.debug("Creating XPathFactory from objectModelUri: {}", objectModelUri); + xpathFactory = ObjectHelper.isEmpty(xpathFactoryClassName) + ? XPathFactory.newInstance(objectModelUri) + : XPathFactory.newInstance(objectModelUri, xpathFactoryClassName, null); + LOG.info("Created XPathFactory: {} from objectModelUri: {}", xpathFactory, objectModelUri); + } - return xpathFactory; - } + return xpathFactory; + } - if (defaultXPathFactory == null) { - defaultXPathFactory = createDefaultXPathFactory(); + if (defaultXPathFactory == null) { + defaultXPathFactory = createDefaultXPathFactory(); + } + return defaultXPathFactory; + } finally { + lock.unlock(); } - return defaultXPathFactory; } protected static XPathFactory createDefaultXPathFactory() throws XPathFactoryConfigurationException { diff --git a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java index 23675babc61..b765efdd7f6 100644 --- a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java +++ b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.xml.transform.ErrorListener; import javax.xml.transform.Result; @@ -75,7 +77,7 @@ public class XsltBuilder implements Processor { private XsltMessageLogger xsltMessageLogger; private final XMLConverterHelper converter = new XMLConverterHelper(); - private final Object sourceHandlerFactoryLock = new Object(); + private final Lock sourceHandlerFactoryLock = new ReentrantLock(); public XsltBuilder() { } @@ -298,12 +300,15 @@ public class XsltBuilder implements Processor { public SourceHandlerFactory getSourceHandlerFactory() { if (this.sourceHandlerFactory == null) { - synchronized (this.sourceHandlerFactoryLock) { + sourceHandlerFactoryLock.lock(); + try { if (this.sourceHandlerFactory == null) { final XmlSourceHandlerFactoryImpl xmlSourceHandlerFactory = createXmlSourceHandlerFactoryImpl(); xmlSourceHandlerFactory.setFailOnNullBody(isFailOnNullBody()); this.sourceHandlerFactory = xmlSourceHandlerFactory; } + } finally { + sourceHandlerFactoryLock.unlock(); } } diff --git a/components/camel-zendesk/src/main/java/org/apache/camel/component/zendesk/internal/ZendeskPropertiesHelper.java b/components/camel-zendesk/src/main/java/org/apache/camel/component/zendesk/internal/ZendeskPropertiesHelper.java index 5288c79b32d..3363349bee7 100644 --- a/components/camel-zendesk/src/main/java/org/apache/camel/component/zendesk/internal/ZendeskPropertiesHelper.java +++ b/components/camel-zendesk/src/main/java/org/apache/camel/component/zendesk/internal/ZendeskPropertiesHelper.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.zendesk.internal; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.camel.CamelContext; import org.apache.camel.component.zendesk.ZendeskConfiguration; import org.apache.camel.support.component.ApiMethodPropertiesHelper; @@ -25,16 +28,22 @@ import org.apache.camel.support.component.ApiMethodPropertiesHelper; */ public final class ZendeskPropertiesHelper extends ApiMethodPropertiesHelper<ZendeskConfiguration> { + private static final Lock LOCK = new ReentrantLock(); private static ZendeskPropertiesHelper helper; private ZendeskPropertiesHelper(CamelContext context) { super(context, ZendeskConfiguration.class, ZendeskConstants.PROPERTY_PREFIX); } - public static synchronized ZendeskPropertiesHelper getHelper(CamelContext context) { - if (helper == null) { - helper = new ZendeskPropertiesHelper(context); + public static ZendeskPropertiesHelper getHelper(CamelContext context) { + LOCK.lock(); + try { + if (helper == null) { + helper = new ZendeskPropertiesHelper(context); + } + return helper; + } finally { + LOCK.unlock(); } - return helper; } }