Hi Oleg, Thanks for merging!
Reading code like: ioSession.lock().unlock(); is quite confusing, how about: ioSession.getLock().unlock(); ? Gary On Tue, Nov 20, 2018 at 12:39 AM <[email protected]> wrote: > Repository: httpcomponents-core > Updated Branches: > refs/heads/master b6b71d90d -> 0a31b7216 > > > HTTP/1.1 and HTTP/2 async protocol handlers to use I/O session lock for > output synchronization > > > Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo > Commit: > http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/0a31b721 > Tree: > http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/0a31b721 > Diff: > http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/0a31b721 > > Branch: refs/heads/master > Commit: 0a31b7216933795836a6f057886674d24843615a > Parents: e7d0a16 > Author: Oleg Kalnichevski <[email protected]> > Authored: Mon Nov 19 15:10:21 2018 +0100 > Committer: Oleg Kalnichevski <[email protected]> > Committed: Tue Nov 20 08:37:03 2018 +0100 > > ---------------------------------------------------------------------- > .../nio/AbstractHttp2StreamMultiplexer.java | 40 +++++++++----------- > .../impl/nio/AbstractHttp1StreamDuplexer.java | 40 ++++++++------------ > 2 files changed, 33 insertions(+), 47 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/0a31b721/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java > ---------------------------------------------------------------------- > diff --git > a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java > b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java > index e0b37f0..6d2cd3c 100644 > --- > a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java > +++ > b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java > @@ -41,8 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.ConcurrentLinkedDeque; > import java.util.concurrent.ConcurrentLinkedQueue; > import java.util.concurrent.atomic.AtomicInteger; > -import java.util.concurrent.locks.Lock; > -import java.util.concurrent.locks.ReentrantLock; > > import javax.net.ssl.SSLSession; > > @@ -117,7 +115,6 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > private final Queue<AsyncPingHandler> pingHandlers; > private final AtomicInteger connInputWindow; > private final AtomicInteger connOutputWindow; > - private final Lock outputLock; > private final AtomicInteger outputRequests; > private final AtomicInteger lastStreamId; > private final Http2StreamListener streamListener; > @@ -153,7 +150,6 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, > this.localConfig.getMaxFrameSize()); > this.outputQueue = new ConcurrentLinkedDeque<>(); > this.pingHandlers = new ConcurrentLinkedQueue<>(); > - this.outputLock = new ReentrantLock(); > this.outputRequests = new AtomicInteger(0); > this.lastStreamId = new AtomicInteger(0); > this.hPackEncoder = new > HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig)); > @@ -246,11 +242,11 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > > private void commitFrame(final RawFrame frame) throws IOException { > Args.notNull(frame, "Frame"); > - outputLock.lock(); > + ioSession.lock().lock(); > try { > commitFrameInternal(frame); > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -434,7 +430,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > > public final void onOutput() throws HttpException, IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (!outputBuffer.isEmpty()) { > outputBuffer.flush(ioSession.channel()); > @@ -451,7 +447,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > > final int connWinSize = connInputWindow.get(); > @@ -483,7 +479,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > } > } > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (!outputPending && outputBuffer.isEmpty() && > outputQueue.isEmpty() > && > outputRequests.compareAndSet(pendingOutputRequests, 0)) { > @@ -492,7 +488,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > outputRequests.addAndGet(-pendingOutputRequests); > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -513,13 +509,13 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > } > if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outputBuffer.isEmpty() && outputQueue.isEmpty()) { > ioSession.close(); > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > } > @@ -1307,7 +1303,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > > @Override > public void submit(final List<Header> headers, final boolean > endStream) throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (headers == null || headers.isEmpty()) { > throw new > H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are > missing"); > @@ -1321,7 +1317,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > localEndStream = true; > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -1342,7 +1338,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > final Http2Stream stream = new Http2Stream(channel, > streamHandler, false); > streamMap.put(promisedStreamId, stream); > > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (localEndStream) { > stream.releaseResources(); > @@ -1351,7 +1347,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > commitPushPromise(id, promisedStreamId, headers); > idle = false; > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -1365,20 +1361,20 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > > @Override > public int write(final ByteBuffer payload) throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (localEndStream) { > return 0; > } > return streamData(id, outputWindow, payload); > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @Override > public void endStream(final List<? extends Header> trailers) > throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (localEndStream) { > return; > @@ -1391,7 +1387,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > commitFrameInternal(frame); > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -1431,7 +1427,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > > boolean localReset(final int code) throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (localEndStream) { > return false; > @@ -1445,7 +1441,7 @@ abstract class AbstractHttp2StreamMultiplexer > implements Identifiable, HttpConne > } > return false; > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > > > http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/0a31b721/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java > ---------------------------------------------------------------------- > diff --git > a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java > b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java > index 5b549ba..585cbed 100644 > --- > a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java > +++ > b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java > @@ -36,8 +36,6 @@ import java.nio.channels.SelectionKey; > import java.nio.channels.WritableByteChannel; > import java.util.List; > import java.util.concurrent.atomic.AtomicInteger; > -import java.util.concurrent.locks.Lock; > -import java.util.concurrent.locks.ReentrantLock; > > import javax.net.ssl.SSLSession; > > @@ -97,7 +95,6 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > private final ContentLengthStrategy outgoingContentStrategy; > private final AtomicInteger inputWindow; > private final ByteBuffer contentBuffer; > - private final Lock outputLock; > private final AtomicInteger outputRequests; > > private volatile Message<IncomingMessage, ContentDecoder> > incomingMessage; > @@ -134,7 +131,6 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > DefaultContentLengthStrategy.INSTANCE; > this.inputWindow = new AtomicInteger(0); > this.contentBuffer = > ByteBuffer.allocate(this.h1Config.getBufferSize()); > - this.outputLock = new ReentrantLock(); > this.outputRequests = new AtomicInteger(0); > this.connState = ConnectionState.READY; > } > @@ -362,7 +358,7 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > } > > public final void onOutput() throws IOException, HttpException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outbuf.hasData()) { > final int bytesWritten = > outbuf.flush(ioSession.channel()); > @@ -371,29 +367,23 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > } > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) { > produceOutput(); > final int pendingOutputRequests = outputRequests.get(); > final boolean outputPending = isOutputReady(); > - outputLock.lock(); > + final boolean outputEnd; > + ioSession.lock().lock(); > try { > if (!outputPending && !outbuf.hasData() && > outputRequests.compareAndSet(pendingOutputRequests, 0)) { > ioSession.clearEvent(SelectionKey.OP_WRITE); > } else { > outputRequests.addAndGet(-pendingOutputRequests); > } > - } finally { > - outputLock.unlock(); > - } > - > - outputLock.lock(); > - final boolean outputEnd; > - try { > outputEnd = outgoingMessage == null && !outbuf.hasData(); > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > if (outputEnd) { > outputEnd(); > @@ -469,7 +459,7 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > final OutgoingMessage messageHead, > final boolean endStream, > final FlushMode flushMode) throws HttpException, IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > outgoingMessageWriter.write(messageHead, outbuf); > updateOutputMetrics(messageHead, connMetrics); > @@ -491,7 +481,7 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > } > ioSession.setEvent(EventMask.WRITE); > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > @@ -513,7 +503,7 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > } > > void suspendSessionOutput() throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outbuf.hasData()) { > outbuf.flush(ioSession.channel()); > @@ -521,12 +511,12 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > ioSession.clearEvent(SelectionKey.OP_WRITE); > } > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > int streamOutput(final ByteBuffer src) throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outgoingMessage == null) { > throw new ClosedChannelException(); > @@ -538,14 +528,14 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > } > return bytesWritten; > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD} > > MessageDelineation endOutputStream(final List<? extends Header> > trailers) throws IOException { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outgoingMessage == null) { > return MessageDelineation.NONE; > @@ -558,12 +548,12 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > ? MessageDelineation.CHUNK_CODED > : MessageDelineation.MESSAGE_HEAD; > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > boolean isOutputCompleted() { > - outputLock.lock(); > + ioSession.lock().lock(); > try { > if (outgoingMessage == null) { > return true; > @@ -571,7 +561,7 @@ abstract class > AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, > final ContentEncoder contentEncoder = > outgoingMessage.getBody(); > return contentEncoder.isCompleted(); > } finally { > - outputLock.unlock(); > + ioSession.lock().unlock(); > } > } > > >
