This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 3bc67c05d97c172eb7a9def2e54e1e1c6f082a82 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Jun 3 14:15:59 2019 +0100 Refactor Stream / Connection flow control window allocation --- .../apache/coyote/http2/Http2UpgradeHandler.java | 78 +++------ .../apache/coyote/http2/LocalStrings.properties | 11 +- java/org/apache/coyote/http2/Stream.java | 50 +++--- .../coyote/http2/WindowAllocationManager.java | 189 +++++++++++++++++++++ 4 files changed, 243 insertions(+), 85 deletions(-) diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index 8abd92b..cbfbd3b 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -37,12 +37,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.servlet.http.WebConnection; -import org.apache.coyote.ActionCode; import org.apache.coyote.Adapter; import org.apache.coyote.CloseNowException; import org.apache.coyote.ProtocolException; import org.apache.coyote.Request; -import org.apache.coyote.Response; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http2.HpackDecoder.HeaderEmitter; import org.apache.coyote.http2.HpackEncoder.State; @@ -751,11 +749,10 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { - // Need to be holding the connection allocation lock so releaseBacklog() - // can't notify this thread until after this thread enters wait() + // Need to be holding the stream lock so releaseBacklog() can't notify + // this thread until after this thread enters wait() int allocation = 0; - Object connectionAllocationLock = stream.getConnectionAllocationLock(); - synchronized (connectionAllocationLock) { + synchronized (stream) { do { synchronized (this) { if (!stream.canWrite()) { @@ -808,38 +805,35 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU // request is for a stream, use the connection // timeout long writeTimeout = protocol.getWriteTimeout(); - if (writeTimeout < 0) { - connectionAllocationLock.wait(); - } else { - connectionAllocationLock.wait(writeTimeout); - // Has this stream been granted an allocation - // Note: If the stream in not in this Map then the - // requested write has been fully allocated - BacklogTracker tracker; - // Ensure allocations made in other threads are visible - synchronized (this) { - tracker = backLogStreams.get(stream); + stream.waitForConnectionAllocation(writeTimeout); + // Has this stream been granted an allocation + // Note: If the stream in not in this Map then the + // requested write has been fully allocated + BacklogTracker tracker; + // Ensure allocations made in other threads are visible + synchronized (this) { + tracker = backLogStreams.get(stream); + } + if (tracker != null && tracker.getUnusedAllocation() == 0) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.noAllocation", + connectionId, stream.getIdentifier())); } - if (tracker != null && tracker.getUnusedAllocation() == 0) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.noAllocation", - connectionId, stream.getIdentifier())); - } - // No allocation - // Close the connection. Do this first since - // closing the stream will raise an exception - close(); - // Close the stream (in app code so need to - // signal to app stream is closing) - stream.doWriteTimeout(); + // No allocation + // Close the connection. Do this first since + // closing the stream will raise an exception + close(); + // Close the stream (in app code so need to + // signal to app stream is closing) + stream.doWriteTimeout(); } - } } catch (InterruptedException e) { throw new IOException(sm.getString( "upgradeHandler.windowSizeReservationInterrupted", connectionId, stream.getIdentifier(), Integer.toString(reservation)), e); } } else { + stream.waitForConnectionAllocationNonBlocking(); return 0; } } @@ -876,29 +870,7 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU if (this == stream) { continue; } - Response coyoteResponse = ((Stream) stream).getCoyoteResponse(); - if (coyoteResponse.getWriteListener() == null) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.notify", - connectionId, stream.getIdentifier())); - } - // Blocking, so use notify to release StreamOutputBuffer - Object connectionAllocationLock = ((Stream) stream).getConnectionAllocationLock(); - synchronized (connectionAllocationLock) { - connectionAllocationLock.notify(); - } - } else { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.dispatchWrite", - connectionId, stream.getIdentifier())); - } - // Non-blocking so dispatch - coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); - // Need to explicitly execute dispatches on the - // StreamProcessor as this thread is being processed by an - // UpgradeProcessor which won't see this dispatch - coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null); - } + ((Stream) stream).notifyConnection(); } } } diff --git a/java/org/apache/coyote/http2/LocalStrings.properties b/java/org/apache/coyote/http2/LocalStrings.properties index dc3bd1c..47d8c3a 100644 --- a/java/org/apache/coyote/http2/LocalStrings.properties +++ b/java/org/apache/coyote/http2/LocalStrings.properties @@ -118,7 +118,6 @@ upgradeHandler.allocate.left=Connection [{0}], Stream [{1}], [{2}] bytes unalloc upgradeHandler.allocate.recipient=Connection [{0}], Stream [{1}], potential recipient [{2}] with weight [{3}] upgradeHandler.connectionError=Connection error upgradeHandler.dependency.invalid=Connection [{0}], Stream [{1}], Streams may not depend on themselves -upgradeHandler.dispatchWrite=Connection [{0}], Stream [{1}], Dispatching to container thread for async write upgradeHandler.goaway.debug=Connection [{0}], Goaway, Last stream [{1}], Error code [{2}], Debug data [{3}] upgradeHandler.init=Connection [{0}], State [{1}] upgradeHandler.initialWindowSize.invalid=Connection [{0}], Illegal value of [{1}] ignored for initial window size @@ -126,7 +125,6 @@ upgradeHandler.invalidPreface=Connection [{0}], Invalid connection preface upgradeHandler.ioerror=Connection [{0}] upgradeHandler.noAllocation=Connection [{0}], Stream [{1}], Timeout waiting for allocation upgradeHandler.noNewStreams=Connection [{0}], Stream [{1}], Stream ignored as no new streams are permitted on this connection -upgradeHandler.notify=Connection [{0}], Stream [{1}], notify() called to release StreamOutputBuffer upgradeHandler.pause.entry=Connection [{0}] Pausing upgradeHandler.prefaceReceived=Connection [{0}], Connection preface received from client upgradeHandler.pingFailed=Connection [{0}] Failed to send ping to client @@ -156,5 +154,14 @@ upgradeHandler.writeBody=Connection [{0}], Stream [{1}], Data length [{2}] upgradeHandler.writeHeaders=Connection [{0}], Stream [{1}] upgradeHandler.writePushHeaders=Connection [{0}], Stream [{1}], Pushed stream [{2}], EndOfStream [{3}] +windowAllocationManager.dispatched=Connection [{0}], Stream [{1}], Dispatched +windowAllocationManager.notify=Connection [{0}], Stream [{1}], Waiting type [{2}], Notify type [{3}] +windowAllocationManager.notified=Connection [{0}], Stream [{1}], Notified +windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (blocking) with timeout [{3}] +windowAllocationManager.waitFor.stream=Connection [{0}], Stream [{1}], Waiting for Stream flow control window (blocking) with timeout [{3}] +windowAllocationManager.waitFor.ise=Connection [{0}], Stream [{1}], Already waiting +windowAllocationManager.waitForNonBlocking.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (non-blocking) +windowAllocationManager.waitForNonBlocking.stram=Connection [{0}], Stream [{1}], Waiting for Stream flow control window (non-blocking) + writeStateMachine.endWrite.ise=It is illegal to specify [{0}] for the new state once a write has completed writeStateMachine.ise=It is illegal to call [{0}()] in state [{1}] diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index 7f783fd..cc76200 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -69,7 +69,7 @@ public class Stream extends AbstractStream implements HeaderEmitter { private final Http2UpgradeHandler handler; private final StreamStateMachine state; - private final Object connectionAllocationLock = new Object(); + private final WindowAllocationManager allocationManager = new WindowAllocationManager(this); // State machine would be too much overhead private int headerState = HEADER_STATE_START; @@ -234,14 +234,7 @@ public class Stream extends AbstractStream implements HeaderEmitter { final void cancelAllocationRequests() { - // Cancel wait on stream allocation request (if any) - synchronized (this) { - this.notify(); - } - // Cancel wait on connection allocation request (if any) - synchronized (connectionAllocationLock) { - connectionAllocationLock.notify(); - } + allocationManager.notifyAny(); } @@ -259,17 +252,7 @@ public class Stream extends AbstractStream implements HeaderEmitter { boolean notify = getWindowSize() < 1; super.incrementWindowSize(windowSizeIncrement); if (notify && getWindowSize() > 0) { - if (coyoteResponse.getWriteListener() == null) { - // Blocking, so use notify to release StreamOutputBuffer - notify(); - } else { - // Non-blocking so dispatch - coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); - // Need to explicitly execute dispatches on the StreamProcessor - // as this thread is being processed by an UpgradeProcessor - // which won't see this dispatch - coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null); - } + allocationManager.notifyStream(); } } @@ -284,11 +267,7 @@ public class Stream extends AbstractStream implements HeaderEmitter { if (block) { try { long writeTimeout = handler.getProtocol().getStreamWriteTimeout(); - if (writeTimeout < 0) { - wait(); - } else { - wait(writeTimeout); - } + allocationManager.waitForStream(writeTimeout); windowSize = getWindowSize(); if (windowSize == 0) { doWriteTimeout(); @@ -300,6 +279,7 @@ public class Stream extends AbstractStream implements HeaderEmitter { throw new IOException(e); } } else { + allocationManager.waitForStreamNonBlocking(); return 0; } } @@ -329,6 +309,21 @@ public class Stream extends AbstractStream implements HeaderEmitter { } + void waitForConnectionAllocation(long timeout) throws InterruptedException { + allocationManager.waitForConnection(timeout); + } + + + void waitForConnectionAllocationNonBlocking() { + allocationManager.waitForConnectionNonBlocking(); + } + + + void notifyConnection() { + allocationManager.notifyConnection(); + } + + @Override @Deprecated protected synchronized void doNotifyAll() { @@ -728,11 +723,6 @@ public class Stream extends AbstractStream implements HeaderEmitter { } - Object getConnectionAllocationLock() { - return connectionAllocationLock; - } - - private static void push(final Http2UpgradeHandler handler, final Request request, final Stream stream) throws IOException { if (org.apache.coyote.Constants.IS_SECURITY_ENABLED) { diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java b/java/org/apache/coyote/http2/WindowAllocationManager.java new file mode 100644 index 0000000..7626bf3 --- /dev/null +++ b/java/org/apache/coyote/http2/WindowAllocationManager.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http2; + +import org.apache.coyote.ActionCode; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; + +/** + * Tracks whether the stream is waiting for an allocation to the stream flow + * control window, to the connection flow control window or not waiting for an + * allocation and only issues allocation notifications when the stream is known + * to be waiting for the notification. + * + * It is possible for a stream to be waiting for a connection allocation when + * a stream allocation is made. Therefore this class tracks the type of + * allocation that the stream is waiting for to ensure that notifications are + * correctly triggered. + * + * With the implementation at the time of writing, it is not possible for a + * stream to receive an unexpected connection notification as these are only + * issues to streams in the backlog and a stream must be waiting for a + * connection allocation in order to be placed on the backlog. However, as a + * precaution, this class protects against unexpected connection notifications. + * + * It is important for asynchronous processing not to notify unless a + * notification is expected else a dispatch will be performed unnecessarily + * which may lead to unexpected results. + * + * A previous implementation used separate locks for the stream and connection + * notifications. However, correct handling of allocation waiting requires + * holding the stream lock when making the decision to wait. Therefore both + * allocations need to wait on the Stream. + */ +class WindowAllocationManager { + + private static final Log log = LogFactory.getLog(WindowAllocationManager.class); + private static final StringManager sm = StringManager.getManager(WindowAllocationManager.class); + + private static final int NONE = 0; + private static final int STREAM = 1; + private static final int CONNECTION = 2; + + private final Stream stream; + + private int waitingFor = NONE; + + WindowAllocationManager(Stream stream) { + this.stream = stream; + } + + void waitForStream(long timeout) throws InterruptedException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitFor.stream", + stream.getConnectionId(), stream.getIdentifier(), Long.toString(timeout))); + } + + waitFor(STREAM, timeout); + } + + + void waitForConnection(long timeout) throws InterruptedException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitFor.connection", + stream.getConnectionId(), stream.getIdentifier(), Long.toString(timeout))); + } + + waitFor(CONNECTION, timeout); + } + + + void waitForStreamNonBlocking() { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.stream", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitForNonBlocking(STREAM); + } + + + void waitForConnectionNonBlocking() { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.connection", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitForNonBlocking(CONNECTION); + } + + + void notifyStream() { + notify(STREAM); + } + + + void notifyConnection() { + notify(CONNECTION); + } + + + void notifyAny() { + notify(STREAM | CONNECTION); + } + + + private void waitFor(int waitTarget, long timeout) throws InterruptedException { + synchronized (stream) { + if (waitingFor != 0) { + throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitingFor = waitTarget; + + if (timeout < 0) { + stream.wait(); + } else { + stream.wait(timeout); + } + + waitingFor = 0; + } + } + + + private void waitForNonBlocking(int waitTarget) { + synchronized (stream) { + if (waitingFor == 0) { + waitingFor = waitTarget; + } else if (waitingFor == waitTarget) { + // NO-OP + // Non-blocking post-processing may attempt to flush + } else { + throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", + stream.getConnectionId(), stream.getIdentifier())); + } + + } + } + + + private void notify(int notifyTarget) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.notify", stream.getConnectionId(), + stream.getIdentifier(), Integer.toString(waitingFor), Integer.toString(notifyTarget))); + } + + synchronized (stream) { + if ((notifyTarget & waitingFor) > 0) { + if (stream.getCoyoteResponse().getWriteListener() == null) { + // Blocking, so use notify to release StreamOutputBuffer + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.notified", + stream.getConnectionId(), stream.getIdentifier())); + } + stream.notify(); + } else { + waitingFor = 0; + // Non-blocking so dispatch + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.dispatched", + stream.getConnectionId(), stream.getIdentifier())); + } + stream.getCoyoteResponse().action(ActionCode.DISPATCH_WRITE, null); + // Need to explicitly execute dispatches on the StreamProcessor + // as this thread is being processed by an UpgradeProcessor + // which won't see this dispatch + stream.getCoyoteResponse().action(ActionCode.DISPATCH_EXECUTE, null); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org