This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/10.1.x by this push:
     new e633e73d61 Refactor to reduce pinning in HTTP/2 code when using 
virtual threads
e633e73d61 is described below

commit e633e73d616196fa26da69458fca26b31c5d436a
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Jul 27 15:37:59 2023 +0100

    Refactor to reduce pinning in HTTP/2 code when using virtual threads
---
 java/org/apache/coyote/http2/AbstractStream.java   |  84 ++++---
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 241 +++++++++++----------
 java/org/apache/coyote/http2/RecycledStream.java   |   1 -
 java/org/apache/coyote/http2/Stream.java           |  84 +++----
 .../coyote/http2/WindowAllocationManager.java      |  27 ++-
 5 files changed, 250 insertions(+), 187 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractStream.java 
b/java/org/apache/coyote/http2/AbstractStream.java
index f332b8c593..d6fb8d8280 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -16,6 +16,10 @@
  */
 package org.apache.coyote.http2;
 
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.res.StringManager;
@@ -33,6 +37,8 @@ abstract class AbstractStream {
     private final String idAsString;
 
     private long windowSize = 
ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
+    protected final Lock windowAllocationLock = new ReentrantLock();
+    protected final Condition windowAllocationAvailable = 
windowAllocationLock.newCondition();
 
     private volatile int connectionAllocationRequested = 0;
     private volatile int connectionAllocationMade = 0;
@@ -59,13 +65,23 @@ abstract class AbstractStream {
     }
 
 
-    final synchronized void setWindowSize(long windowSize) {
-        this.windowSize = windowSize;
+    final void setWindowSize(long windowSize) {
+        windowAllocationLock.lock();
+        try {
+            this.windowSize = windowSize;
+        } finally {
+            windowAllocationLock.unlock();
+        }
     }
 
 
-    final synchronized long getWindowSize() {
-        return windowSize;
+    final long getWindowSize() {
+        windowAllocationLock.lock();
+        try {
+            return windowSize;
+        } finally {
+            windowAllocationLock.unlock();
+        }
     }
 
 
@@ -76,37 +92,47 @@ abstract class AbstractStream {
      *
      * @throws Http2Exception If the window size is now higher than the 
maximum allowed
      */
-    synchronized void incrementWindowSize(int increment) throws Http2Exception 
{
-        // No need for overflow protection here.
-        // Increment can't be more than Integer.MAX_VALUE and once windowSize
-        // goes beyond 2^31-1 an error is triggered.
-        windowSize += increment;
-
-        if (log.isDebugEnabled()) {
-            log.debug(sm.getString("abstractStream.windowSizeInc", 
getConnectionId(), getIdAsString(),
-                    Integer.toString(increment), Long.toString(windowSize)));
-        }
+    void incrementWindowSize(int increment) throws Http2Exception {
+        windowAllocationLock.lock();
+        try {
+            // No need for overflow protection here.
+            // Increment can't be more than Integer.MAX_VALUE and once 
windowSize
+            // goes beyond 2^31-1 an error is triggered.
+            windowSize += increment;
+
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("abstractStream.windowSizeInc", 
getConnectionId(), getIdAsString(),
+                        Integer.toString(increment), 
Long.toString(windowSize)));
+            }
 
-        if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) {
-            String msg = sm.getString("abstractStream.windowSizeTooBig", 
getConnectionId(), identifier,
-                    Integer.toString(increment), Long.toString(windowSize));
-            if (identifier.intValue() == 0) {
-                throw new ConnectionException(msg, 
Http2Error.FLOW_CONTROL_ERROR);
-            } else {
-                throw new StreamException(msg, Http2Error.FLOW_CONTROL_ERROR, 
identifier.intValue());
+            if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) {
+                String msg = sm.getString("abstractStream.windowSizeTooBig", 
getConnectionId(), identifier,
+                        Integer.toString(increment), 
Long.toString(windowSize));
+                if (identifier.intValue() == 0) {
+                    throw new ConnectionException(msg, 
Http2Error.FLOW_CONTROL_ERROR);
+                } else {
+                    throw new StreamException(msg, 
Http2Error.FLOW_CONTROL_ERROR, identifier.intValue());
+                }
             }
+        } finally {
+            windowAllocationLock.unlock();
         }
     }
 
 
-    final synchronized void decrementWindowSize(int decrement) {
-        // No need for overflow protection here. Decrement can never be larger
-        // the Integer.MAX_VALUE and once windowSize goes negative no further
-        // decrements are permitted
-        windowSize -= decrement;
-        if (log.isDebugEnabled()) {
-            log.debug(sm.getString("abstractStream.windowSizeDec", 
getConnectionId(), getIdAsString(),
-                    Integer.toString(decrement), Long.toString(windowSize)));
+    final void decrementWindowSize(int decrement) {
+        windowAllocationLock.lock();
+        try {
+            // No need for overflow protection here. Decrement can never be 
larger
+            // the Integer.MAX_VALUE and once windowSize goes negative no 
further
+            // decrements are permitted
+            windowSize -= decrement;
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("abstractStream.windowSizeDec", 
getConnectionId(), getIdAsString(),
+                        Integer.toString(decrement), 
Long.toString(windowSize)));
+            }
+        } finally {
+            windowAllocationLock.unlock();
         }
     }
 
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 811737bf83..bf8be7cf83 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -929,8 +929,10 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // Need to be holding the stream lock so releaseBacklog() can't notify
         // this thread until after this thread enters wait()
         int allocation = 0;
-        synchronized (stream) {
-            synchronized (this) {
+        stream.windowAllocationLock.lock();
+        try {
+            windowAllocationLock.lock();
+            try {
                 if (!stream.canWrite()) {
                     stream.doStreamCancel(
                             sm.getString("upgradeHandler.stream.notWritable", 
stream.getConnectionId(),
@@ -955,6 +957,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
                     allocation = reservation;
                     decrementWindowSize(allocation);
                 }
+            } finally {
+                windowAllocationLock.unlock();
             }
             if (allocation == 0) {
                 if (block) {
@@ -1001,18 +1005,19 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                     return 0;
                 }
             }
+        } finally {
+            stream.windowAllocationLock.unlock();
         }
         return allocation;
     }
 
 
-    @SuppressWarnings("sync-override") // notify() needs to be outside sync
-                                       // to avoid deadlock
     @Override
     protected void incrementWindowSize(int increment) throws Http2Exception {
         Set<AbstractStream> streamsToNotify = null;
 
-        synchronized (this) {
+        windowAllocationLock.lock();
+        try {
             long windowSize = getWindowSize();
             if (windowSize < 1 && windowSize + increment > 0) {
                 // Connection window is exhausted. Assume there will be streams
@@ -1021,6 +1026,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             } else {
                 super.incrementWindowSize(increment);
             }
+        } finally {
+            windowAllocationLock.unlock();
         }
 
         if (streamsToNotify != null) {
@@ -1053,134 +1060,144 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     }
 
 
-    private synchronized Set<AbstractStream> releaseBackLog(int increment) 
throws Http2Exception {
-        Set<AbstractStream> result = new HashSet<>();
-        if (backLogSize < increment) {
-            // Can clear the whole backlog
-            for (AbstractStream stream : backLogStreams) {
-                if (stream.getConnectionAllocationRequested() > 0) {
-                    
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
-                    stream.setConnectionAllocationRequested(0);
-                    result.add(stream);
+    private Set<AbstractStream> releaseBackLog(int increment) throws 
Http2Exception {
+        windowAllocationLock.lock();
+        try {
+            Set<AbstractStream> result = new HashSet<>();
+            if (backLogSize < increment) {
+                // Can clear the whole backlog
+                for (AbstractStream stream : backLogStreams) {
+                    if (stream.getConnectionAllocationRequested() > 0) {
+                        
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
+                        stream.setConnectionAllocationRequested(0);
+                        result.add(stream);
+                    }
                 }
-            }
-            // Cast is safe due to test above
-            int remaining = increment - (int) backLogSize;
-            backLogSize = 0;
-            super.incrementWindowSize(remaining);
+                // Cast is safe due to test above
+                int remaining = increment - (int) backLogSize;
+                backLogSize = 0;
+                super.incrementWindowSize(remaining);
 
-            backLogStreams.clear();
-        } else {
-            // Can't clear the whole backlog.
-            // Need streams in priority order
-            Set<Stream> orderedStreams = new 
ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency)
-                    
.thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt));
-            orderedStreams.addAll(backLogStreams);
-
-            // Iteration 1. Need to work out how much we can clear.
-            long urgencyWhereAllocationIsExhausted = 0;
-            long requestedAllocationForIncrementalStreams = 0;
-            int remaining = increment;
-            Iterator<Stream> orderedStreamsIterator = 
orderedStreams.iterator();
-            while (orderedStreamsIterator.hasNext()) {
-                Stream s = orderedStreamsIterator.next();
-                if (urgencyWhereAllocationIsExhausted < s.getUrgency()) {
-                    if (remaining < 1) {
-                        break;
+                backLogStreams.clear();
+            } else {
+                // Can't clear the whole backlog.
+                // Need streams in priority order
+                Set<Stream> orderedStreams = new 
ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency)
+                        
.thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt));
+                orderedStreams.addAll(backLogStreams);
+
+                // Iteration 1. Need to work out how much we can clear.
+                long urgencyWhereAllocationIsExhausted = 0;
+                long requestedAllocationForIncrementalStreams = 0;
+                int remaining = increment;
+                Iterator<Stream> orderedStreamsIterator = 
orderedStreams.iterator();
+                while (orderedStreamsIterator.hasNext()) {
+                    Stream s = orderedStreamsIterator.next();
+                    if (urgencyWhereAllocationIsExhausted < s.getUrgency()) {
+                        if (remaining < 1) {
+                            break;
+                        }
+                        requestedAllocationForIncrementalStreams = 0;
                     }
-                    requestedAllocationForIncrementalStreams = 0;
-                }
-                urgencyWhereAllocationIsExhausted = s.getUrgency();
-                if (s.getIncremental()) {
-                    requestedAllocationForIncrementalStreams += 
s.getConnectionAllocationRequested();
-                    remaining -= s.getConnectionAllocationRequested();
-                } else {
-                    remaining -= s.getConnectionAllocationRequested();
-                    if (remaining < 1) {
-                        break;
+                    urgencyWhereAllocationIsExhausted = s.getUrgency();
+                    if (s.getIncremental()) {
+                        requestedAllocationForIncrementalStreams += 
s.getConnectionAllocationRequested();
+                        remaining -= s.getConnectionAllocationRequested();
+                    } else {
+                        remaining -= s.getConnectionAllocationRequested();
+                        if (remaining < 1) {
+                            break;
+                        }
                     }
                 }
-            }
 
-            // Iteration 2. Allocate.
-            // Reset for second iteration
-            remaining = increment;
-            orderedStreamsIterator = orderedStreams.iterator();
-            while (orderedStreamsIterator.hasNext()) {
-                Stream s = orderedStreamsIterator.next();
-                if (s.getUrgency() < urgencyWhereAllocationIsExhausted) {
-                    // Can fully allocate
-                    remaining = allocate(s, remaining);
-                    result.add(s);
-                    orderedStreamsIterator.remove();
-                    backLogStreams.remove(s);
-                } else if (requestedAllocationForIncrementalStreams == 0) {
-                    // Allocation ran out in non-incremental streams so fully
-                    // allocate in iterator order until allocation is exhausted
-                    remaining = allocate(s, remaining);
-                    result.add(s);
-                    if (s.getConnectionAllocationRequested() == 0) {
-                        // Fully allocated
+                // Iteration 2. Allocate.
+                // Reset for second iteration
+                remaining = increment;
+                orderedStreamsIterator = orderedStreams.iterator();
+                while (orderedStreamsIterator.hasNext()) {
+                    Stream s = orderedStreamsIterator.next();
+                    if (s.getUrgency() < urgencyWhereAllocationIsExhausted) {
+                        // Can fully allocate
+                        remaining = allocate(s, remaining);
+                        result.add(s);
                         orderedStreamsIterator.remove();
                         backLogStreams.remove(s);
-                    }
-                    if (remaining < 1) {
-                        break;
-                    }
-                } else {
-                    // Allocation ran out in incremental streams. Distribute
-                    // remaining allocation between the incremental streams at
-                    // this urgency level.
-                    if (s.getUrgency() != urgencyWhereAllocationIsExhausted) {
-                        break;
-                    }
+                    } else if (requestedAllocationForIncrementalStreams == 0) {
+                        // Allocation ran out in non-incremental streams so 
fully
+                        // allocate in iterator order until allocation is 
exhausted
+                        remaining = allocate(s, remaining);
+                        result.add(s);
+                        if (s.getConnectionAllocationRequested() == 0) {
+                            // Fully allocated
+                            orderedStreamsIterator.remove();
+                            backLogStreams.remove(s);
+                        }
+                        if (remaining < 1) {
+                            break;
+                        }
+                    } else {
+                        // Allocation ran out in incremental streams. 
Distribute
+                        // remaining allocation between the incremental 
streams at
+                        // this urgency level.
+                        if (s.getUrgency() != 
urgencyWhereAllocationIsExhausted) {
+                            break;
+                        }
 
-                    int share = (int) (s.getConnectionAllocationRequested() * 
remaining /
-                            requestedAllocationForIncrementalStreams);
-                    if (share == 0) {
-                        share = 1;
-                    }
-                    allocate(s, share);
-                    result.add(s);
-                    if (s.getConnectionAllocationRequested() == 0) {
-                        // Fully allocated (unlikely but possible due to
-                        // rounding if only a few bytes required).
-                        orderedStreamsIterator.remove();
-                        backLogStreams.remove(s);
+                        int share = (int) 
(s.getConnectionAllocationRequested() * remaining /
+                                requestedAllocationForIncrementalStreams);
+                        if (share == 0) {
+                            share = 1;
+                        }
+                        allocate(s, share);
+                        result.add(s);
+                        if (s.getConnectionAllocationRequested() == 0) {
+                            // Fully allocated (unlikely but possible due to
+                            // rounding if only a few bytes required).
+                            orderedStreamsIterator.remove();
+                            backLogStreams.remove(s);
+                        }
                     }
                 }
             }
+            return result;
+        } finally {
+            windowAllocationLock.unlock();
         }
-        return result;
     }
 
 
-    private synchronized int allocate(AbstractStream stream, int allocation) {
-        if (log.isDebugEnabled()) {
-            log.debug(sm.getString("upgradeHandler.allocate.debug", 
getConnectionId(), stream.getIdAsString(),
-                    Integer.toString(allocation)));
-        }
+    private int allocate(AbstractStream stream, int allocation) {
+        windowAllocationLock.lock();
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("upgradeHandler.allocate.debug", 
getConnectionId(), stream.getIdAsString(),
+                        Integer.toString(allocation)));
+            }
 
-        int leftToAllocate = allocation;
+            int leftToAllocate = allocation;
 
-        if (stream.getConnectionAllocationRequested() > 0) {
-            int allocatedThisTime;
-            if (allocation >= stream.getConnectionAllocationRequested()) {
-                allocatedThisTime = stream.getConnectionAllocationRequested();
-            } else {
-                allocatedThisTime = allocation;
+            if (stream.getConnectionAllocationRequested() > 0) {
+                int allocatedThisTime;
+                if (allocation >= stream.getConnectionAllocationRequested()) {
+                    allocatedThisTime = 
stream.getConnectionAllocationRequested();
+                } else {
+                    allocatedThisTime = allocation;
+                }
+                
stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested()
 - allocatedThisTime);
+                
stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + 
allocatedThisTime);
+                leftToAllocate = leftToAllocate - allocatedThisTime;
             }
-            
stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested()
 - allocatedThisTime);
-            
stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + 
allocatedThisTime);
-            leftToAllocate = leftToAllocate - allocatedThisTime;
-        }
 
-        if (log.isDebugEnabled()) {
-            log.debug(sm.getString("upgradeHandler.allocate.left", 
getConnectionId(), stream.getIdAsString(),
-                    Integer.toString(leftToAllocate)));
-        }
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("upgradeHandler.allocate.left", 
getConnectionId(), stream.getIdAsString(),
+                        Integer.toString(leftToAllocate)));
+            }
 
-        return leftToAllocate;
+            return leftToAllocate;
+        } finally {
+            windowAllocationLock.unlock();
+        }
     }
 
 
diff --git a/java/org/apache/coyote/http2/RecycledStream.java 
b/java/org/apache/coyote/http2/RecycledStream.java
index 143ae1d371..c4c180ac1f 100644
--- a/java/org/apache/coyote/http2/RecycledStream.java
+++ b/java/org/apache/coyote/http2/RecycledStream.java
@@ -40,7 +40,6 @@ class RecycledStream extends AbstractNonZeroStream {
     }
 
 
-    @SuppressWarnings("sync-override")
     @Override
     void incrementWindowSize(int increment) throws Http2Exception {
         // NO-OP
diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index b0dd8ad844..6b32049b82 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -219,52 +219,62 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
 
 
     @Override
-    final synchronized void incrementWindowSize(int windowSizeIncrement) 
throws Http2Exception {
-        // If this is zero then any thread that has been trying to write for
-        // this stream will be waiting. Notify that thread it can continue. Use
-        // notify all even though only one thread is waiting to be on the safe
-        // side.
-        boolean notify = getWindowSize() < 1;
-        super.incrementWindowSize(windowSizeIncrement);
-        if (notify && getWindowSize() > 0) {
-            allocationManager.notifyStream();
+    final void incrementWindowSize(int windowSizeIncrement) throws 
Http2Exception {
+        windowAllocationLock.lock();
+        try {
+            // If this is zero then any thread that has been trying to write 
for
+            // this stream will be waiting. Notify that thread it can 
continue. Use
+            // notify all even though only one thread is waiting to be on the 
safe
+            // side.
+            boolean notify = getWindowSize() < 1;
+            super.incrementWindowSize(windowSizeIncrement);
+            if (notify && getWindowSize() > 0) {
+                allocationManager.notifyStream();
+            }
+        } finally {
+            windowAllocationLock.unlock();
         }
     }
 
 
-    final synchronized int reserveWindowSize(int reservation, boolean block) 
throws IOException {
-        long windowSize = getWindowSize();
-        while (windowSize < 1) {
-            if (!canWrite()) {
-                throw new CloseNowException(sm.getString("stream.notWritable", 
getConnectionId(), getIdAsString()));
-            }
-            if (block) {
-                try {
-                    long writeTimeout = 
handler.getProtocol().getStreamWriteTimeout();
-                    allocationManager.waitForStream(writeTimeout);
-                    windowSize = getWindowSize();
-                    if (windowSize == 0) {
-                        doStreamCancel(sm.getString("stream.writeTimeout"), 
Http2Error.ENHANCE_YOUR_CALM);
+    final int reserveWindowSize(int reservation, boolean block) throws 
IOException {
+        windowAllocationLock.lock();
+        try {
+            long windowSize = getWindowSize();
+            while (windowSize < 1) {
+                if (!canWrite()) {
+                    throw new 
CloseNowException(sm.getString("stream.notWritable", getConnectionId(), 
getIdAsString()));
+                }
+                if (block) {
+                    try {
+                        long writeTimeout = 
handler.getProtocol().getStreamWriteTimeout();
+                        allocationManager.waitForStream(writeTimeout);
+                        windowSize = getWindowSize();
+                        if (windowSize == 0) {
+                            
doStreamCancel(sm.getString("stream.writeTimeout"), 
Http2Error.ENHANCE_YOUR_CALM);
+                        }
+                    } catch (InterruptedException e) {
+                        // Possible shutdown / rst or similar. Use an 
IOException to
+                        // signal to the client that further I/O isn't 
possible for this
+                        // Stream.
+                        throw new IOException(e);
                     }
-                } catch (InterruptedException e) {
-                    // Possible shutdown / rst or similar. Use an IOException 
to
-                    // signal to the client that further I/O isn't possible 
for this
-                    // Stream.
-                    throw new IOException(e);
+                } else {
+                    allocationManager.waitForStreamNonBlocking();
+                    return 0;
                 }
+            }
+            int allocation;
+            if (windowSize < reservation) {
+                allocation = (int) windowSize;
             } else {
-                allocationManager.waitForStreamNonBlocking();
-                return 0;
+                allocation = reservation;
             }
+            decrementWindowSize(allocation);
+            return allocation;
+        } finally {
+            windowAllocationLock.unlock();
         }
-        int allocation;
-        if (windowSize < reservation) {
-            allocation = (int) windowSize;
-        } else {
-            allocation = reservation;
-        }
-        decrementWindowSize(allocation);
-        return allocation;
     }
 
 
diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java 
b/java/org/apache/coyote/http2/WindowAllocationManager.java
index e784c4083c..811fe1821e 100644
--- a/java/org/apache/coyote/http2/WindowAllocationManager.java
+++ b/java/org/apache/coyote/http2/WindowAllocationManager.java
@@ -129,14 +129,18 @@ class WindowAllocationManager {
 
 
     private boolean isWaitingFor(int waitTarget) {
-        synchronized (stream) {
+        stream.windowAllocationLock.lock();
+        try {
             return (waitingFor & waitTarget) > 0;
+        } finally {
+            stream.windowAllocationLock.unlock();
         }
     }
 
 
     private void waitFor(int waitTarget, final long timeout) throws 
InterruptedException {
-        synchronized (stream) {
+        stream.windowAllocationLock.lock();
+        try {
             if (waitingFor != NONE) {
                 throw new 
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                         stream.getConnectionId(), stream.getIdAsString()));
@@ -148,7 +152,7 @@ class WindowAllocationManager {
             // Loop to handle spurious wake-ups
             do {
                 if (timeout < 0) {
-                    stream.wait();
+                    stream.windowAllocationAvailable.await();
                 } else {
                     long timeoutRemaining;
                     if (startNanos == -1) {
@@ -164,15 +168,18 @@ class WindowAllocationManager {
                             return;
                         }
                     }
-                    stream.wait(timeoutRemaining);
+                    stream.windowAllocationAvailable.await(timeoutRemaining, 
TimeUnit.MILLISECONDS);
                 }
             } while (waitingFor != NONE);
+        } finally {
+            stream.windowAllocationLock.unlock();
         }
     }
 
 
     private void waitForNonBlocking(int waitTarget) {
-        synchronized (stream) {
+        stream.windowAllocationLock.lock();
+        try {
             if (waitingFor == NONE) {
                 waitingFor = waitTarget;
             } else if (waitingFor == waitTarget) {
@@ -182,14 +189,16 @@ class WindowAllocationManager {
                 throw new 
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                         stream.getConnectionId(), stream.getIdAsString()));
             }
-
+        } finally {
+            stream.windowAllocationLock.unlock();
         }
     }
 
 
     private void notify(int notifyTarget) {
 
-        synchronized (stream) {
+        stream.windowAllocationLock.lock();
+        try {
             if (log.isDebugEnabled()) {
                 log.debug(sm.getString("windowAllocationManager.notify", 
stream.getConnectionId(),
                         stream.getIdAsString(), Integer.toString(waitingFor), 
Integer.toString(notifyTarget)));
@@ -210,7 +219,7 @@ class WindowAllocationManager {
                             
log.debug(sm.getString("windowAllocationManager.notified", 
stream.getConnectionId(),
                                     stream.getIdAsString()));
                         }
-                        stream.notify();
+                        stream.windowAllocationAvailable.signal();
                     } else {
                         // Non-blocking so dispatch
                         if (log.isDebugEnabled()) {
@@ -225,6 +234,8 @@ class WindowAllocationManager {
                     }
                 }
             }
+        } finally {
+            stream.windowAllocationLock.unlock();
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to