Author: markt Date: Thu May 4 10:11:24 2017 New Revision: 1793763 URL: http://svn.apache.org/viewvc?rev=1793763&view=rev Log: More refactoring to reduce duplication prior to adding trailer header support.
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1793763&r1=1793762&r2=1793763&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Thu May 4 10:11:24 2017 @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import org.apache.coyote.Adapter; import org.apache.coyote.ProtocolException; import org.apache.coyote.Request; -import org.apache.coyote.http2.HpackEncoder.State; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; @@ -135,67 +134,13 @@ public class Http2AsyncUpgradeHandler ex @Override void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { + doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); + } - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId, - stream.getIdentifier(), Integer.valueOf(pushedStreamId), - Boolean.valueOf(endOfStream))); - } - - if (!stream.canWrite()) { - return; - } - - byte[] pushedStreamIdBytes = null; - if (pushedStreamId > 0) { - pushedStreamIdBytes = new byte[4]; - ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId); - } - boolean first = true; - State state = null; - ArrayList<ByteBuffer> bufs = new ArrayList<>(); - - while (state != State.COMPLETE) { - byte[] header = new byte[9]; - ByteBuffer payload = ByteBuffer.allocate(payloadSize); - if (first && pushedStreamIdBytes != null) { - payload.put(pushedStreamIdBytes); - } - state = getHpackEncoder().encode(mimeHeaders, payload); - payload.flip(); - if (state == State.COMPLETE || payload.limit() > 0) { - ByteUtil.setThreeBytes(header, 0, payload.limit()); - if (first) { - first = false; - if (pushedStreamIdBytes == null) { - header[3] = FrameType.HEADERS.getIdByte(); - } else { - header[3] = FrameType.PUSH_PROMISE.getIdByte(); - } - if (endOfStream) { - header[4] = FLAG_END_OF_STREAM; - } - } else { - header[3] = FrameType.CONTINUATION.getIdByte(); - } - if (state == State.COMPLETE) { - header[4] += FLAG_END_OF_HEADERS; - } - if (log.isDebugEnabled()) { - log.debug(payload.limit() + " bytes"); - } - ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); - bufs.add(ByteBuffer.wrap(header)); - bufs.add(payload); - } else if (state == State.UNDERFLOW) { - payloadSize = payloadSize * 2; - } - } - socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, - null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, - bufs.toArray(BYTEBUFFER_ARRAY)); - handleAsyncException(); + @Override + protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) { + return new AsyncHeaderFrameBuffers(initialPayloadSize); } @@ -315,4 +260,53 @@ public class Http2AsyncUpgradeHandler ex } + + private class AsyncHeaderFrameBuffers implements HeaderFrameBuffers { + + int payloadSize; + + private byte[] header; + private ByteBuffer payload; + + private final ArrayList<ByteBuffer> bufs = new ArrayList<>(); + + public AsyncHeaderFrameBuffers(int initialPayloadSize) { + this.payloadSize = initialPayloadSize; + } + + @Override + public void startFrame() { + header = new byte[9]; + payload = ByteBuffer.allocate(payloadSize); + } + + @Override + public void endFrame() throws IOException { + bufs.add(ByteBuffer.wrap(header)); + bufs.add(payload); + } + + @Override + public void endHeaders() throws IOException { + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, + bufs.toArray(BYTEBUFFER_ARRAY)); + handleAsyncException(); + } + + @Override + public byte[] getHeader() { + return header; + } + + @Override + public ByteBuffer getPayload() { + return payload; + } + + @Override + public void expandPayload() { + payloadSize = payloadSize * 2; + } + } } Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1793763&r1=1793762&r2=1793763&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 4 10:11:24 2017 @@ -524,6 +524,19 @@ class Http2UpgradeHandler extends Abstra void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { + // This ensures the Stream processing thread has control of the socket. + synchronized (socketWrapper) { + doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); + } + } + + + /* + * Separate method to allow Http2AsyncUpgradeHandler to call this code + * without synchronizing on socketWrapper since it doesn't need to. + */ + void doWriteHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, + boolean endOfStream, int payloadSize) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId, @@ -535,8 +548,7 @@ class Http2UpgradeHandler extends Abstra return; } - byte[] header = new byte[9]; - ByteBuffer payload = ByteBuffer.allocate(payloadSize); + HeaderFrameBuffers headerFrameBuffers = getHeaderFrameBuffers(payloadSize); byte[] pushedStreamIdBytes = null; if (pushedStreamId > 0) { @@ -547,49 +559,46 @@ class Http2UpgradeHandler extends Abstra boolean first = true; State state = null; - // This ensures the Stream processing thread has control of the socket. - synchronized (socketWrapper) { - while (state != State.COMPLETE) { - if (first && pushedStreamIdBytes != null) { - payload.put(pushedStreamIdBytes); - } - state = getHpackEncoder().encode(mimeHeaders, payload); - payload.flip(); - if (state == State.COMPLETE || payload.limit() > 0) { - ByteUtil.setThreeBytes(header, 0, payload.limit()); - if (first) { - first = false; - if (pushedStreamIdBytes == null) { - header[3] = FrameType.HEADERS.getIdByte(); - } else { - header[3] = FrameType.PUSH_PROMISE.getIdByte(); - } - if (endOfStream) { - header[4] = FLAG_END_OF_STREAM; - } + while (state != State.COMPLETE) { + headerFrameBuffers.startFrame(); + if (first && pushedStreamIdBytes != null) { + headerFrameBuffers.getPayload().put(pushedStreamIdBytes); + } + state = getHpackEncoder().encode(mimeHeaders, headerFrameBuffers.getPayload()); + headerFrameBuffers.getPayload().flip(); + if (state == State.COMPLETE || headerFrameBuffers.getPayload().limit() > 0) { + ByteUtil.setThreeBytes(headerFrameBuffers.getHeader(), 0, headerFrameBuffers.getPayload().limit()); + if (first) { + first = false; + if (pushedStreamIdBytes == null) { + headerFrameBuffers.getHeader()[3] = FrameType.HEADERS.getIdByte(); } else { - header[3] = FrameType.CONTINUATION.getIdByte(); + headerFrameBuffers.getHeader()[3] = FrameType.PUSH_PROMISE.getIdByte(); } - if (state == State.COMPLETE) { - header[4] += FLAG_END_OF_HEADERS; - } - if (log.isDebugEnabled()) { - log.debug(payload.limit() + " bytes"); + if (endOfStream) { + headerFrameBuffers.getHeader()[4] = FLAG_END_OF_STREAM; } - ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); - try { - socketWrapper.write(true, header, 0, header.length); - socketWrapper.write(true, payload); - socketWrapper.flush(true); - } catch (IOException ioe) { - handleAppInitiatedIOException(ioe); - } - payload.clear(); - } else if (state == State.UNDERFLOW) { - payload = ByteBuffer.allocate(payload.capacity() * 2); + } else { + headerFrameBuffers.getHeader()[3] = FrameType.CONTINUATION.getIdByte(); } + if (state == State.COMPLETE) { + headerFrameBuffers.getHeader()[4] += FLAG_END_OF_HEADERS; + } + if (log.isDebugEnabled()) { + log.debug(headerFrameBuffers.getPayload().limit() + " bytes"); + } + ByteUtil.set31Bits(headerFrameBuffers.getHeader(), 5, stream.getIdentifier().intValue()); + headerFrameBuffers.endFrame(); + } else if (state == State.UNDERFLOW) { + headerFrameBuffers.expandPayload(); } } + headerFrameBuffers.endHeaders(); + } + + + protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) { + return new DefaultHeaderFrameBuffers(initialPayloadSize); } @@ -1570,7 +1579,7 @@ class Http2UpgradeHandler extends Abstra } - private enum ConnectionState { + private static enum ConnectionState { NEW(true), CONNECTED(true), @@ -1587,5 +1596,65 @@ class Http2UpgradeHandler extends Abstra public boolean isNewStreamAllowed() { return newStreamsAllowed; } - } + } + + + protected static interface HeaderFrameBuffers { + public void startFrame(); + public void endFrame() throws IOException; + public void endHeaders() throws IOException; + public byte[] getHeader(); + public ByteBuffer getPayload(); + public void expandPayload(); + } + + + private class DefaultHeaderFrameBuffers implements HeaderFrameBuffers { + + private final byte[] header; + private ByteBuffer payload; + + public DefaultHeaderFrameBuffers(int initialPayloadSize) { + header = new byte[9]; + payload = ByteBuffer.allocate(initialPayloadSize); + } + + @Override + public void startFrame() { + // NO-OP + } + + + @Override + public void endFrame() throws IOException { + try { + socketWrapper.write(true, header, 0, header.length); + socketWrapper.write(true, payload); + socketWrapper.flush(true); + } catch (IOException ioe) { + handleAppInitiatedIOException(ioe); + } + payload.clear(); + } + + @Override + public void endHeaders() { + // NO-OP + } + + @Override + public byte[] getHeader() { + return header; + } + + @Override + public ByteBuffer getPayload() { + return payload; + } + + @Override + public void expandPayload() { + payload = ByteBuffer.allocate(payload.capacity() * 2); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org