This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push:
new 0a94801588 Refactor to remove syncs on SocketWrapper to support Loom
experiments
0a94801588 is described below
commit 0a9480158874ea910a4d629d24f31d69d6cc5f96
Author: Mark Thomas <[email protected]>
AuthorDate: Mon May 23 17:27:24 2022 +0100
Refactor to remove syncs on SocketWrapper to support Loom experiments
---
java/org/apache/coyote/AbstractProcessor.java | 7 ++-
.../apache/coyote/http2/Http2UpgradeHandler.java | 73 +++++++++++++++-------
.../tomcat/util/net/SocketProcessorBase.java | 7 ++-
.../apache/tomcat/util/net/SocketWrapperBase.java | 7 +++
webapps/docs/changelog.xml | 9 +++
5 files changed, 80 insertions(+), 23 deletions(-)
diff --git a/java/org/apache/coyote/AbstractProcessor.java
b/java/org/apache/coyote/AbstractProcessor.java
index 699a935eb6..50743ffece 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletConnection;
@@ -865,7 +866,9 @@ public abstract class AbstractProcessor extends
AbstractProcessorLight implement
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
Iterator<DispatchType> dispatches = getIteratorAndClearDispatches();
if (socketWrapper != null) {
- synchronized (socketWrapper) {
+ Lock lock = socketWrapper.getLock();
+ lock.lock();
+ try {
/*
* This method is called when non-blocking IO is initiated by
defining
* a read and/or write listener in a non-container thread. It
is called
@@ -888,6 +891,8 @@ public abstract class AbstractProcessor extends
AbstractProcessorLight implement
DispatchType dispatchType = dispatches.next();
socketWrapper.processSocket(dispatchType.getSocketStatus(), false);
}
+ } finally {
+ lock.unlock();
}
}
}
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 64ad129b40..c143149c3c 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -335,12 +335,15 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
try {
switch(status) {
case OPEN_READ:
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (!socketWrapper.canWrite()) {
// Only send a ping if there is no other data waiting
to be sent.
// Ping manager will ensure they aren't sent too
frequently.
pingManager.sendPing(false);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
try {
// There is data to read so use the read timeout while
@@ -567,12 +570,15 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
// may see out of order RST frames which may hard to follow if
// the client is unaware the RST frames may be received out of
// order.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (state != null) {
state.sendReset();
}
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
@@ -658,7 +664,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, len);
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, payloadLength, 0, payloadLength.length);
socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
socketWrapper.write(true, fixedPayload, 0, 8);
@@ -666,14 +673,19 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
socketWrapper.write(true, debugMsg, 0, debugMsg.length);
}
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders
mimeHeaders,
boolean endOfStream, int payloadSize) throws IOException {
// This ensures the Stream processing thread has control of the socket.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream,
payloadSize);
+ } finally {
+ socketWrapper.getLock().unlock();
}
stream.sentHeaders();
if (endOfStream) {
@@ -790,17 +802,18 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
}
if (writable) {
ByteUtil.set31Bits(header, 5, stream.getIdAsInt());
- synchronized (socketWrapper) {
- try {
- socketWrapper.write(true, header, 0, header.length);
- int orgLimit = data.limit();
- data.limit(data.position() + len);
- socketWrapper.write(true, data);
- data.limit(orgLimit);
- socketWrapper.flush(true);
- } catch (IOException ioe) {
- handleAppInitiatedIOException(ioe);
- }
+ socketWrapper.getLock().lock();
+ try {
+ socketWrapper.write(true, header, 0, header.length);
+ int orgLimit = data.limit();
+ data.limit(data.position() + len);
+ socketWrapper.write(true, data);
+ data.limit(orgLimit);
+ socketWrapper.flush(true);
+ } catch (IOException ioe) {
+ handleAppInitiatedIOException(ioe);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
}
@@ -832,7 +845,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
log.debug(sm.getString("upgradeHandler.windowUpdateConnection",
getConnectionId(), Integer.valueOf(increment)));
}
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
// Build window update frame for stream 0
byte[] frame = new byte[13];
ByteUtil.setThreeBytes(frame, 0, 4);
@@ -867,12 +881,15 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
if (needFlush) {
socketWrapper.flush(true);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
protected void processWrites() throws IOException {
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (socketWrapper.flush(false)) {
socketWrapper.registerWriteInterest();
} else {
@@ -880,6 +897,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
// Ping manager will ensure they aren't sent too frequently.
pingManager.sendPing(false);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
@@ -1400,10 +1419,13 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
// Synchronized since PUSH_PROMISE frames have to be sent in order.
Once
// the stream has been created we need to ensure that the PUSH_PROMISE
// is sent before the next stream is created for a PUSH_PROMISE.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
pushStream = createLocalStream(request);
writeHeaders(associatedStream, pushStream.getIdAsInt(),
request.getMimeHeaders(),
false, Constants.DEFAULT_HEADERS_FRAME_SIZE);
+ } finally {
+ socketWrapper.getLock().unlock();
}
pushStream.sentPushPromise();
@@ -1784,9 +1806,12 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
"upgradeHandler.unexpectedAck", connectionId,
getIdAsString()));
}
} else {
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, SETTINGS_ACK, 0,
SETTINGS_ACK.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
}
@@ -1911,7 +1936,8 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
if (force || now - lastPingNanoTime > pingIntervalNano) {
lastPingNanoTime = now;
byte[] payload = new byte[8];
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
int sentSequence = ++sequence;
PingRecord pingRecord = new PingRecord(sentSequence, now);
inflightPings.add(pingRecord);
@@ -1919,6 +1945,8 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
socketWrapper.write(true, PING, 0, PING.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().lock();
}
}
}
@@ -1949,10 +1977,13 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
} else {
// Client originated ping. Echo it back.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().lock();
}
}
}
diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
index 1207ab0e20..138da2e7b1 100644
--- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
+++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
@@ -17,6 +17,7 @@
package org.apache.tomcat.util.net;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
public abstract class SocketProcessorBase<S> implements Runnable {
@@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements
Runnable {
@Override
public final void run() {
- synchronized (socketWrapper) {
+ Lock lock = socketWrapper.getLock();
+ lock.lock();
+ try {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that
processing
// does not occur in parallel. The test below ensures that if the
@@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements
Runnable {
return;
}
doRun();
+ } finally {
+ lock.unlock();
}
}
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index e32525ba0d..e671f632f8 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import jakarta.servlet.ServletConnection;
@@ -59,6 +61,7 @@ public abstract class SocketWrapperBase<E> {
private E socket;
private final AbstractEndpoint<E,?> endpoint;
+ private final Lock lock = new ReentrantLock();
protected final AtomicBoolean closed = new AtomicBoolean(false);
@@ -155,6 +158,10 @@ public abstract class SocketWrapperBase<E> {
return endpoint;
}
+ public Lock getLock() {
+ return lock;
+ }
+
public Object getCurrentProcessor() {
return currentProcessor.get();
}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 747b252b3c..fd61fd1f1d 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -126,6 +126,15 @@
</fix>
</changelog>
</subsection>
+ <subsection name="Coyote">
+ <changelog>
+ <scode>
+ Refactor synchronization blocks locking on <code>SocketWrapper</code>
to
+ use <code>ReentrantLock</code> to support users wishing to experiment
+ with project Loom. (markt)
+ </scode>
+ </changelog>
+ </subsection>
<subsection name="Jasper">
<changelog>
<fix>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]