HDFS-8671 Add client support for HTTP/2 stream channels
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8c0e0313 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8c0e0313 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8c0e0313 Branch: refs/heads/HDFS-7966 Commit: 8c0e0313de97b67f5d199df6230421dc7ea9e5be Parents: c24a64f Author: zhangduo <zhang...@wandoujia.com> Authored: Sat Oct 17 11:40:08 2015 +0800 Committer: zhangduo <zhang...@wandoujia.com> Committed: Sat Oct 17 11:40:08 2015 +0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/pom.xml | 39 +++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 7 + .../web/PortUnificationServerHandler.java | 5 +- .../web/http2/AbstractHttp2EventListener.java | 129 ++++++++ .../http2/ByteBufferReadableInputStream.java | 31 ++ .../web/http2/ClientHttp2ConnectionHandler.java | 146 +++++++++ .../web/http2/ClientHttp2EventListener.java | 42 +++ .../hdfs/web/http2/Http2DataReceiver.java | 326 +++++++++++++++++++ .../hdfs/web/http2/Http2StreamBootstrap.java | 160 +++++++++ .../hdfs/web/http2/Http2StreamChannel.java | 161 +++++++-- .../apache/hadoop/hdfs/web/http2/Http2Util.java | 85 +++++ .../web/http2/ServerHttp2ConnectionHandler.java | 52 ++- .../web/http2/ServerHttp2EventListener.java | 87 +---- .../hdfs/web/http2/StartHttp2StreamRequest.java | 48 +++ .../datanode/web/dtp/Http2ResponseHandler.java | 63 ---- .../server/datanode/web/dtp/TestDtpHttp2.java | 98 +++--- .../hdfs/web/http2/AbstractTestHttp2Client.java | 132 ++++++++ .../hadoop/hdfs/web/http2/TestHttp2Client.java | 65 ++++ .../web/http2/TestHttp2ClientMultiThread.java | 100 ++++++ .../hdfs/web/http2/TestHttp2DataReceiver.java | 111 +++++++ .../hadoop/hdfs/web/http2/TestHttp2Server.java | 5 +- .../web/http2/TestHttp2ServerMultiThread.java | 9 +- hadoop-project/pom.xml | 14 +- 23 files changed, 1637 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 2cb1716..de6be26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -199,6 +199,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <scope>test</scope> </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-server</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-kms</artifactId> <classifier>classes</classifier> @@ -221,6 +226,40 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>mockserver-netty</artifactId> <version>3.9.2</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec-socks</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + </exclusions> </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index cb05fa9..0366ba5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1180,4 +1180,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { @Deprecated public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; + + // http2 related configs + public static final String DFS_HTTP2_INITIAL_WINDOW_SIZE = "dfs.http2.initial.windows.size"; + public static final int DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT = 64 * 1024; + + public static final String DFS_HTTP2_WINDOW_UPDATE_RATIO = "dfs.http2.window.update.ratio"; + public static final float DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT = 0.5f; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java index e5c5256..ce8a2cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.stream.ChunkedWriteHandler; import java.net.InetSocketAddress; @@ -66,7 +67,7 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder { new URLDispatcher(proxyHost, conf, confForCreate)); } - private void configureHttp2(ChannelHandlerContext ctx) { + private void configureHttp2(ChannelHandlerContext ctx) throws Http2Exception { ctx.pipeline().addLast( ServerHttp2ConnectionHandler.create(ctx.channel(), new ChannelInitializer<Http2StreamChannel>() { @@ -75,7 +76,7 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder { protected void initChannel(Http2StreamChannel ch) throws Exception { ch.pipeline().addLast(new DtpChannelHandler()); } - })); + }, conf)); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java new file mode 100644 index 0000000..2ab2358 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2Connection.PropertyKey; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2EventAdapter; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2Stream; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Base class of an HTTP/2 FrameListener and EventListener to manage + * {@link Http2StreamChannel}s. + * <p> + * We do not handle onRstStreamRead here, a stream that being reset will also + * call onStreamClosed. The upper layer should not rely on a reset event. + */ +@InterfaceAudience.Private +public abstract class AbstractHttp2EventListener extends Http2EventAdapter { + + protected final Channel parentChannel; + + protected final Http2Connection conn; + + protected final PropertyKey subChannelPropKey; + + protected final AtomicInteger numActiveStreams = new AtomicInteger(0); + + protected AbstractHttp2EventListener(Channel parentChannel, + Http2Connection conn) { + this.parentChannel = parentChannel; + this.conn = conn; + this.subChannelPropKey = conn.newKey(); + } + + protected abstract void initChannelOnStreamActive( + Http2StreamChannel subChannel); + + @Override + public void onStreamActive(final Http2Stream stream) { + numActiveStreams.incrementAndGet(); + Http2StreamChannel subChannel = + new Http2StreamChannel(parentChannel, stream); + stream.setProperty(subChannelPropKey, subChannel); + initChannelOnStreamActive(subChannel); + } + + @Override + public void onStreamClosed(Http2Stream stream) { + numActiveStreams.decrementAndGet(); + Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey); + if (subChannel != null && subChannel.isRegistered()) { + subChannel.setClosed(); + } + } + + private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception { + Http2StreamChannel subChannel = + conn.stream(streamId).getProperty(subChannelPropKey); + if (subChannel == null) { + throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, + "No sub channel found"); + } + return subChannel; + } + + private void writeInbound(int streamId, Object msg, boolean endOfStream, + int pendingBytes) throws Http2Exception { + Http2StreamChannel subChannel = getSubChannel(streamId); + subChannel.writeInbound(msg, pendingBytes); + if (endOfStream) { + subChannel.writeInbound(LastHttp2Message.get(), 0); + } + if (subChannel.config().isAutoRead()) { + subChannel.read(); + } + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endOfStream) + throws Http2Exception { + writeInbound(streamId, headers, endOfStream, 0); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, + boolean exclusive, int padding, boolean endOfStream) + throws Http2Exception { + onHeadersRead(ctx, streamId, headers, padding, endOfStream); + } + + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endOfStream) throws Http2Exception { + int pendingBytes = data.readableBytes() + padding; + writeInbound(streamId, data.retain(), endOfStream, pendingBytes); + return 0; + } + + public int numActiveStreams() { + return numActiveStreams.get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java new file mode 100644 index 0000000..a54715a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ByteBufferReadable; + +/** + * A place holder. + */ +@InterfaceAudience.Private +public abstract class ByteBufferReadableInputStream extends InputStream + implements ByteBufferReadable { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java new file mode 100644 index 0000000..42be10d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelPromiseAggregator; +import io.netty.handler.codec.UnsupportedMessageTypeException; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.logging.LogLevel; +import io.netty.util.concurrent.Promise; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +/** + * An {@link Http2ConnectionHandler} used at client side. + */ +@InterfaceAudience.Private +public class ClientHttp2ConnectionHandler extends Http2ConnectionHandler { + + private static final Log LOG = LogFactory + .getLog(ClientHttp2ConnectionHandler.class); + + private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger( + LogLevel.INFO, ClientHttp2ConnectionHandler.class); + + private AtomicInteger nextStreamId = new AtomicInteger(3); + + private final ClientHttp2EventListener listener; + + private ClientHttp2ConnectionHandler(Http2ConnectionDecoder decoder, + Http2ConnectionEncoder encoder) { + super(decoder, encoder); + this.listener = (ClientHttp2EventListener) decoder.listener(); + } + + private int nextStreamId() { + return nextStreamId.getAndAdd(2); + } + + private void writeHeaders(ChannelHandlerContext ctx, + Http2ConnectionEncoder encoder, final int streamId, Http2Headers headers, + boolean endStream, ChannelPromise promise, + final Promise<Http2StreamChannel> callback) { + encoder.writeHeaders(ctx, streamId, headers, 0, endStream, promise) + .addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + callback + .setSuccess(connection().stream(streamId) + .<Http2StreamChannel> getProperty( + listener.subChannelPropKey)); + } else { + callback.setFailure(future.cause()); + } + } + }); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { + if (msg instanceof StartHttp2StreamRequest) { + final StartHttp2StreamRequest request = (StartHttp2StreamRequest) msg; + final int streamId = nextStreamId(); + Http2ConnectionEncoder encoder = encoder(); + if (request.data.isReadable()) { + ChannelPromiseAggregator aggregator = + new ChannelPromiseAggregator(promise); + ChannelPromise headerPromise = ctx.newPromise(); + aggregator.add(headerPromise); + writeHeaders(ctx, encoder, streamId, request.headers, false, + headerPromise, request.promise); + ChannelPromise dataPromise = ctx.newPromise(); + aggregator.add(dataPromise); + encoder.writeData(ctx, streamId, request.data, 0, request.endStream, + dataPromise); + } else { + writeHeaders(ctx, encoder, streamId, request.headers, + request.endStream, promise, request.promise); + } + } else { + throw new UnsupportedMessageTypeException(msg, + StartHttp2StreamRequest.class); + } + } + + public int numActiveStreams() { + return listener.numActiveStreams(); + } + + public int currentStreamId() { + return nextStreamId.get(); + } + + private static final Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler> FACTORY = + new Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler>() { + + @Override + public ClientHttp2ConnectionHandler create( + Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) { + return new ClientHttp2ConnectionHandler(decoder, encoder); + } + }; + + public static ClientHttp2ConnectionHandler create(Channel channel, + Configuration conf) throws Http2Exception { + Http2Connection conn = new DefaultHttp2Connection(false); + ClientHttp2EventListener listener = + new ClientHttp2EventListener(channel, conn); + return Http2Util.create(conf, conn, listener, FACTORY, + LOG.isDebugEnabled() ? FRAME_LOGGER : null); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java new file mode 100644 index 0000000..f4bbdba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http2.Http2Connection; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An HTTP/2 FrameListener and EventListener to manage + * {@link Http2StreamChannel}s for client. + */ +@InterfaceAudience.Private +public class ClientHttp2EventListener extends AbstractHttp2EventListener { + + public ClientHttp2EventListener(Channel parentChannel, Http2Connection conn) { + super(parentChannel, conn); + } + + @Override + protected void initChannelOnStreamActive(Http2StreamChannel subChannel) { + // disable read until pipeline initialized + subChannel.config().setAutoRead(false); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java new file mode 100644 index 0000000..7cfbcce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A helper class that wrapper the HTTP/2 data frame as an {@link InputStream}. + * <p> + * Notice that, this classes can be used together with + * {@link ReadTimeoutHandler} to limit the waiting time when reading data. + */ +@InterfaceAudience.Private +public class Http2DataReceiver extends ChannelInboundHandlerAdapter { + + private static final Component END_OF_STREAM = new Component(null, 0); + + private static final EOFException EOF = new EOFException(); + + private static final class Component { + + public final ByteBuf buf; + + public final int length; + + public Component(ByteBuf buf) { + this(buf, buf.readableBytes()); + } + + public Component(ByteBuf buf, int length) { + this.buf = buf; + this.length = length; + } + + } + + private final Deque<Component> queue = new ArrayDeque<Component>(); + + private int queuedBytes; + + private Channel channel; + + private Throwable error; + + private Http2Headers headers; + + private final ByteBufferReadableInputStream contentInput = + new ByteBufferReadableInputStream() { + + @Override + public int read() throws IOException { + Component comp = peekUntilAvailable(); + if (comp == END_OF_STREAM) { + return -1; + } + int b = comp.buf.readByte() & 0xFF; + if (!comp.buf.isReadable()) { + removeHead(); + } + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Component comp = peekUntilAvailable(); + if (comp == END_OF_STREAM) { + return -1; + } + int bufReadableBytes = comp.buf.readableBytes(); + if (len >= bufReadableBytes) { + comp.buf.readBytes(b, off, bufReadableBytes); + removeHead(); + return bufReadableBytes; + } else { + comp.buf.readBytes(b, off, len); + return len; + } + } + + @Override + public long skip(long n) throws IOException { + Component comp = peekUntilAvailable(); + if (comp == END_OF_STREAM) { + return 0L; + } + int bufReadableBytes = comp.buf.readableBytes(); + if (n >= bufReadableBytes) { + removeHead(); + return bufReadableBytes; + } else { + comp.buf.skipBytes((int) n); + return n; + } + } + + @Override + public int read(ByteBuffer bb) throws IOException { + Component comp = peekUntilAvailable(); + if (comp == END_OF_STREAM) { + return -1; + } + int bbRemaining = bb.remaining(); + int bufReadableBytes = comp.buf.readableBytes(); + if (bbRemaining >= bufReadableBytes) { + int toRestoredLimit = bb.limit(); + bb.limit(bb.position() + bufReadableBytes); + comp.buf.readBytes(bb); + bb.limit(toRestoredLimit); + removeHead(); + return bufReadableBytes; + } else { + comp.buf.readBytes(bb); + return bbRemaining; + } + } + + private boolean closed = false; + + @Override + public void close() throws IOException { + if (closed) { + return; + } + synchronized (queue) { + if (error == null) { + error = EOF; + } + } + channel.close().addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + synchronized (queue) { + for (Component c; (c = queue.peek()) != null;) { + if (c == END_OF_STREAM) { + return; + } + c.buf.release(); + queue.remove(); + } + } + } + }); + } + + }; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg == LastHttp2Message.get()) { + enqueue(END_OF_STREAM); + } else if (msg instanceof Http2Headers) { + synchronized (queue) { + headers = (Http2Headers) msg; + queue.notifyAll(); + } + } else if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (buf.isReadable()) { + enqueue(new Component(buf)); + } else { + buf.release(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + synchronized (queue) { + if (error == null) { + error = cause; + queue.notifyAll(); + } + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + synchronized (queue) { + if (error != null) { + return; + } + Component lastComp = queue.peekLast(); + if (lastComp == END_OF_STREAM) { + return; + } + error = EOF; + notifyAll(); + } + } + + private void enqueue(Component comp) { + synchronized (queue) { + queuedBytes += comp.length; + if (queuedBytes >= channel.config().getWriteBufferHighWaterMark()) { + channel.config().setAutoRead(false); + } + queue.add(comp); + queue.notifyAll(); + } + } + + private Component peekUntilAvailable() throws IOException { + Throwable cause; + synchronized (queue) { + for (;;) { + if (!queue.isEmpty()) { + return queue.peek(); + } + if (error != null) { + cause = error; + break; + } + try { + queue.wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } + propagate(cause); + return null; + } + + private void removeHead() { + Component comp; + synchronized (queue) { + comp = queue.remove(); + queuedBytes -= comp.length; + ChannelConfig config = channel.config(); + if (!config.isAutoRead() + && queuedBytes < config.getWriteBufferLowWaterMark()) { + config.setAutoRead(true); + } + } + comp.buf.release(); + } + + private void propagate(Throwable cause) throws IOException { + if (cause == ReadTimeoutException.INSTANCE) { + throw new IOException("Read timeout"); + } else if (cause == EOF) { + throw new IOException("Stream reset by peer: " + channel.remoteAddress()); + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new IOException(cause); + } + } + + public Http2Headers waitForResponse() throws IOException { + Throwable cause; + synchronized (queue) { + for (;;) { + if (error != null) { + cause = error; + break; + } + if (headers != null) { + return headers; + } + try { + queue.wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } + propagate(cause); + return null; + } + + /** + * The returned stream is not thread safe. + */ + public ByteBufferReadableInputStream content() { + return contentInput; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java new file mode 100644 index 0000000..4d0b32b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.base.Preconditions; + +/** + * A {@link Http2StreamBootstrap} that makes it easy to bootstrap a + * {@link Http2StreamChannel} to use for clients. + * <p> + * Call connect when you finish set up other things to establish an + * {@link Http2StreamChannel}. + */ +@InterfaceAudience.Private +public class Http2StreamBootstrap { + private Channel channel; + + private Http2Headers headers; + + private ByteBuf data = Unpooled.EMPTY_BUFFER; + + private boolean endStream; + + private ChannelHandler handler; + + /** + * Set the {@link Channel} this HTTP/2 stream is running on. + */ + public Http2StreamBootstrap channel(Channel channel) { + this.channel = channel; + return this; + } + + /** + * Set the request headers. + */ + public Http2StreamBootstrap headers(Http2Headers headers) { + this.headers = headers; + return this; + } + + /** + * Set the request data. + * <p> + * This is used to avoid one context-switch if you only need to send a small + * piece of data. + */ + public Http2StreamBootstrap data(ByteBuf data) { + this.data = data; + return this; + } + + /** + * Set whether there is no data after the headers being sent. + * <p> + * Default is <tt>false </tt>which means you could still send data using the + * returned {@link Http2StreamChannel}. + */ + public Http2StreamBootstrap endStream(boolean endStream) { + this.endStream = endStream; + return this; + } + + /** + * The {@link ChannelHandler} which is used to serve request. + * <p> + * Typically, you should use a {@link ChannelInitializer} here. + */ + public Http2StreamBootstrap handler(ChannelHandler handler) { + this.handler = handler; + return this; + } + + /** + * Establish the {@link Http2StreamChannel}. You can get it with the returned + * {@link Future}. + */ + public Future<Http2StreamChannel> connect() { + Preconditions.checkNotNull(headers); + Preconditions.checkNotNull(handler); + final Promise<Http2StreamChannel> registeredPromise = + channel.eventLoop().<Http2StreamChannel> newPromise(); + + final StartHttp2StreamRequest request = + new StartHttp2StreamRequest(headers, data, endStream, channel + .eventLoop().<Http2StreamChannel> newPromise() + .addListener(new FutureListener<Http2StreamChannel>() { + + @Override + public void operationComplete(Future<Http2StreamChannel> future) + throws Exception { + if (future.isSuccess()) { + final Http2StreamChannel subChannel = future.get(); + subChannel.pipeline().addFirst(handler); + channel.eventLoop().register(subChannel) + .addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + if (future.isSuccess()) { + subChannel.config().setAutoRead(true); + registeredPromise.setSuccess(subChannel); + } else { + registeredPromise.setFailure(future.cause()); + } + } + }); + } else { + registeredPromise.setFailure(future.cause()); + } + } + + })); + EventLoop eventLoop = channel.eventLoop(); + if (eventLoop.inEventLoop()) { + channel.writeAndFlush(request); + } else { + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + channel.writeAndFlush(request); + } + }); + } + + return registeredPromise; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java index 658ffe4..ffaf3ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hdfs.web.http2; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; @@ -32,8 +35,11 @@ import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.InternalThreadLocalMap; import java.net.SocketAddress; @@ -64,10 +70,33 @@ public class Http2StreamChannel extends AbstractChannel { private static final int MAX_READER_STACK_DEPTH = 8; private final ChannelHandlerContext http2ConnHandlerCtx; + private final Http2Stream stream; + + private final Http2LocalFlowController localFlowController; + private final Http2ConnectionEncoder encoder; + private final DefaultChannelConfig config; - private final Queue<Object> inboundMessageQueue = new ArrayDeque<>(); + + private static final class InboundMessage { + + public final Object msg; + + public final int length; + + public InboundMessage(Object msg, int length) { + this.msg = msg; + this.length = length; + } + + } + + private final Queue<InboundMessage> inboundMessageQueue = new ArrayDeque<>(); + + private boolean writePending = false; + + private int pendingOutboundBytes; private enum State { OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED @@ -82,10 +111,16 @@ public class Http2StreamChannel extends AbstractChannel { Http2ConnectionHandler connHandler = (Http2ConnectionHandler) http2ConnHandlerCtx.handler(); this.stream = stream; + this.localFlowController = + connHandler.connection().local().flowController(); this.encoder = connHandler.encoder(); this.config = new DefaultChannelConfig(this); } + public Http2Stream stream() { + return stream; + } + @Override public ChannelConfig config() { return config; @@ -98,8 +133,6 @@ public class Http2StreamChannel extends AbstractChannel { @Override public boolean isActive() { - // we create this channel after HTTP/2 stream active, so we do not have a - // separated 'active' state. return isOpen(); } @@ -115,6 +148,10 @@ public class Http2StreamChannel extends AbstractChannel { SocketAddress localAddress, ChannelPromise promise) { throw new UnsupportedOperationException(); } + + public void forceFlush() { + super.flush0(); + } } @Override @@ -149,7 +186,11 @@ public class Http2StreamChannel extends AbstractChannel { @Override protected void doClose() throws Exception { - if (stream.state() != Http2Stream.State.CLOSED) { + for (InboundMessage msg; (msg = inboundMessageQueue.poll()) != null;) { + ReferenceCountUtil.release(msg.msg); + localFlowController.consumeBytes(stream, msg.length); + } + if (state != State.PRE_CLOSED) { encoder.writeRstStream(http2ConnHandlerCtx, stream.id(), Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise()); } @@ -157,36 +198,44 @@ public class Http2StreamChannel extends AbstractChannel { } private final Runnable readTask = new Runnable() { - @Override public void run() { ChannelPipeline pipeline = pipeline(); - int maxMessagesPerRead = config().getMaxMessagesPerRead(); - for (int i = 0; i < maxMessagesPerRead; i++) { - Object m = inboundMessageQueue.poll(); - if (m == null) { - break; - } - if (m == LastHttp2Message.get()) { + for (InboundMessage m; (m = inboundMessageQueue.poll()) != null;) { + if (m.msg == LastHttp2Message.get()) { state = state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED : State.HALF_CLOSED_REMOTE; } - pipeline.fireChannelRead(m); + try { + if (m.length > 0 + && localFlowController.consumeBytes(stream, m.length)) { + http2ConnHandlerCtx.channel().flush(); + } + } catch (Http2Exception e) { + // an Http2Exception at least means the stream is broken(maybe the + // whole connection), so we are out. + http2ConnHandlerCtx.pipeline().fireExceptionCaught(e); + return; + } + pipeline.fireChannelRead(m.msg); } pipeline.fireChannelReadComplete(); + if (state == State.PRE_CLOSED) { + close(); + } } }; @Override protected void doBeginRead() throws Exception { - State currentState = this.state; - if (currentState == State.CLOSED) { - throw new ClosedChannelException(); - } if (inboundMessageQueue.isEmpty()) { return; } + State currentState = this.state; + if (remoteSideClosed(currentState)) { + throw new ClosedChannelException(); + } final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final Integer stackDepth = threadLocals.localChannelReaderStackDepth(); if (stackDepth < MAX_READER_STACK_DEPTH) { @@ -201,14 +250,25 @@ public class Http2StreamChannel extends AbstractChannel { } } + private void resumeWrite() { + writePending = false; + ((Http2Unsafe) unsafe()).forceFlush(); + } + @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { State currentState = this.state; - if (currentState == State.CLOSED) { + if (localSideClosed(currentState)) { throw new ClosedChannelException(); } + int writeBufferHighWaterMark = config().getWriteBufferHighWaterMark(); + boolean flush = false; for (;;) { + if (pendingOutboundBytes >= writeBufferHighWaterMark) { + writePending = true; + break; + } Object msg = in.current(); if (msg == null) { break; @@ -217,15 +277,30 @@ public class Http2StreamChannel extends AbstractChannel { this.state = currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED : State.HALF_CLOSED_LOCAL; - encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx - .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise()); + encoder.writeData(http2ConnHandlerCtx, stream.id(), + Unpooled.EMPTY_BUFFER, 0, true, http2ConnHandlerCtx.newPromise()); } else if (msg instanceof Http2Headers) { encoder.writeHeaders(http2ConnHandlerCtx, stream.id(), (Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise()); } else if (msg instanceof ByteBuf) { ByteBuf data = (ByteBuf) msg; + final int pendingBytes = data.readableBytes(); + pendingOutboundBytes += pendingBytes; encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0, - false, http2ConnHandlerCtx.newPromise()); + false, http2ConnHandlerCtx.newPromise()).addListener( + new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + pendingOutboundBytes -= pendingBytes; + if (writePending + && pendingOutboundBytes <= config() + .getWriteBufferLowWaterMark()) { + resumeWrite(); + } + } + }); } else { throw new UnsupportedMessageTypeException(msg, Http2Headers.class, ByteBuf.class); @@ -236,33 +311,53 @@ public class Http2StreamChannel extends AbstractChannel { if (flush) { http2ConnHandlerCtx.channel().flush(); } + if (state == State.PRE_CLOSED) { + close(); + } } - /** - * Append a message to the inbound queue of this channel. You need to call - * {@link #read()} if you want to pass the message to handlers. - */ - void writeInbound(Object msg) { - inboundMessageQueue.add(msg); + public void writeInbound(Object msg, int length) { + inboundMessageQueue.add(new InboundMessage(msg, length)); } private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES = ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED); - /** - * @return true if remote side finishes sending data to us. - */ public boolean remoteSideClosed() { + return remoteSideClosed(state); + } + + private boolean remoteSideClosed(State state) { return REMOTE_SIDE_CLOSED_STATES.contains(state); } private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES = ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED); - /** - * @return true if we finish sending data to remote side. - */ public boolean localSideClosed() { + return localSideClosed(state); + } + + private boolean localSideClosed(State state) { return LOCAL_SIDE_CLOSED_STATES.contains(state); } + + public void setClosed() { + State currentState = this.state; + if (!remoteSideClosed(currentState)) { + writeInbound(LastHttp2Message.get(), 0); + if (config().isAutoRead()) { + read(); + } + } + currentState = this.state; + if (!localSideClosed(currentState)) { + this.state = + currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED + : State.HALF_CLOSED_LOCAL; + if (currentState == State.PRE_CLOSED) { + close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java new file mode 100644 index 0000000..dfe9c3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2EventAdapter; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameReader; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +/** + * + */ +@InterfaceAudience.Private +public class Http2Util { + public interface Http2ConnectionHandlerFactory<T extends Http2ConnectionHandler> { + T create(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder); + } + + @SuppressWarnings("resource") + static <T extends Http2ConnectionHandler> T create(Configuration conf, + Http2Connection conn, Http2EventAdapter listener, + Http2ConnectionHandlerFactory<T> handlerFactory, + Http2FrameLogger frameLogger) throws Http2Exception { + conn.addListener(listener); + DefaultHttp2FrameReader rawFrameReader = new DefaultHttp2FrameReader(); + DefaultHttp2FrameWriter rawFrameWriter = new DefaultHttp2FrameWriter(); + rawFrameWriter.maxFrameSize(Http2CodecUtil.MAX_FRAME_SIZE_UPPER_BOUND); + Http2FrameReader frameReader; + Http2FrameWriter frameWriter; + if (frameLogger != null) { + frameReader = new Http2InboundFrameLogger(rawFrameReader, frameLogger); + frameWriter = new Http2OutboundFrameLogger(rawFrameWriter, frameLogger); + } else { + frameReader = rawFrameReader; + frameWriter = rawFrameWriter; + } + DefaultHttp2LocalFlowController localFlowController = + new DefaultHttp2LocalFlowController(conn, frameWriter, conf.getFloat( + DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO, + DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT)); + int initialWindowsSize = + conf.getInt(DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE, + DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT); + localFlowController.initialWindowSize(initialWindowsSize); + conn.local().flowController(localFlowController); + + DefaultHttp2ConnectionEncoder encoder = + new DefaultHttp2ConnectionEncoder(conn, frameWriter); + DefaultHttp2ConnectionDecoder decoder = + new DefaultHttp2ConnectionDecoder(conn, encoder, frameReader, listener); + return handlerFactory.create(decoder, encoder); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java index 1ee733d..964fd0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java @@ -20,21 +20,18 @@ package org.apache.hadoop.hdfs.web.http2; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2FrameReader; -import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionHandler; -import io.netty.handler.codec.http2.Http2FrameListener; +import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2FrameReader; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.handler.logging.LogLevel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; /** * An {@link Http2ConnectionHandler} used at server side. @@ -48,39 +45,36 @@ public class ServerHttp2ConnectionHandler extends Http2ConnectionHandler { private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger( LogLevel.INFO, ServerHttp2ConnectionHandler.class); - private ServerHttp2ConnectionHandler(Http2Connection connection, - Http2FrameReader frameReader, Http2FrameWriter frameWriter, - Http2FrameListener listener) { - super(connection, frameReader, frameWriter, listener); + private ServerHttp2ConnectionHandler(Http2ConnectionDecoder decoder, + Http2ConnectionEncoder encoder) { + super(decoder, encoder); } + private static final Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler> FACTORY = + new Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler>() { + + @Override + public ServerHttp2ConnectionHandler create( + Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) { + return new ServerHttp2ConnectionHandler(decoder, encoder); + } + }; + /** * Create and initialize an {@link ServerHttp2ConnectionHandler}. * @param channel * @param initializer - * @param verbose whether to log inbound and outbound HTTP/2 messages + * @param conf * @return the initialized {@link ServerHttp2ConnectionHandler} + * @throws Http2Exception */ public static ServerHttp2ConnectionHandler create(Channel channel, - ChannelInitializer<Http2StreamChannel> initializer) { + ChannelInitializer<Http2StreamChannel> initializer, Configuration conf) + throws Http2Exception { Http2Connection conn = new DefaultHttp2Connection(true); ServerHttp2EventListener listener = new ServerHttp2EventListener(channel, conn, initializer); - conn.addListener(listener); - Http2FrameReader frameReader; - Http2FrameWriter frameWriter; - if (LOG.isDebugEnabled()) { - frameReader = - new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), - FRAME_LOGGER); - frameWriter = - new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), - FRAME_LOGGER); - } else { - frameReader = new DefaultHttp2FrameReader(); - frameWriter = new DefaultHttp2FrameWriter(); - } - return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter, - listener); + return Http2Util.create(conf, conn, listener, FACTORY, + LOG.isDebugEnabled() ? FRAME_LOGGER : null); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java index 72e3879..f58f6a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java @@ -17,17 +17,9 @@ */ package org.apache.hadoop.hdfs.web.http2; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2Connection.PropertyKey; -import io.netty.handler.codec.http2.Http2Error; -import io.netty.handler.codec.http2.Http2EventAdapter; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -35,35 +27,21 @@ import org.apache.hadoop.classification.InterfaceAudience; /** * An HTTP/2 FrameListener and EventListener to manage - * {@link Http2StreamChannel}s. - * <p> - * We do not handle onRstStreamRead here, a stream that being reset will also - * call onStreamClosed. The upper layer should not rely on a reset event. + * {@link Http2StreamChannel}s for server. */ @InterfaceAudience.Private -public class ServerHttp2EventListener extends Http2EventAdapter { - - private final Channel parentChannel; +public class ServerHttp2EventListener extends AbstractHttp2EventListener { private final ChannelInitializer<Http2StreamChannel> subChannelInitializer; - private final Http2Connection conn; - - private final PropertyKey subChannelPropKey; - public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn, ChannelInitializer<Http2StreamChannel> subChannelInitializer) { - this.parentChannel = parentChannel; - this.conn = conn; + super(parentChannel, conn); this.subChannelInitializer = subChannelInitializer; - this.subChannelPropKey = conn.newKey(); } @Override - public void onStreamActive(final Http2Stream stream) { - Http2StreamChannel subChannel = - new Http2StreamChannel(parentChannel, stream); - stream.setProperty(subChannelPropKey, subChannel); + protected void initChannelOnStreamActive(final Http2StreamChannel subChannel) { subChannel.pipeline().addFirst(subChannelInitializer); parentChannel.eventLoop().register(subChannel) .addListener(new FutureListener<Void>() { @@ -71,65 +49,10 @@ public class ServerHttp2EventListener extends Http2EventAdapter { @Override public void operationComplete(Future<Void> future) throws Exception { if (!future.isSuccess()) { - stream.removeProperty(subChannelPropKey); + subChannel.stream().removeProperty(subChannelPropKey); } } }); - - } - - @Override - public void onStreamClosed(Http2Stream stream) { - Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey); - if (subChannel != null) { - subChannel.close(); - } - } - - private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception { - Http2StreamChannel subChannel = - conn.stream(streamId).getProperty(subChannelPropKey); - if (subChannel == null) { - throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, - "No sub channel found"); - } - return subChannel; - } - - private void writeInbound(int streamId, Object msg, boolean endOfStream) - throws Http2Exception { - Http2StreamChannel subChannel = getSubChannel(streamId); - subChannel.writeInbound(msg); - if (endOfStream) { - subChannel.writeInbound(LastHttp2Message.get()); - } - if (subChannel.config().isAutoRead()) { - subChannel.read(); - } - - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, - Http2Headers headers, int padding, boolean endOfStream) - throws Http2Exception { - writeInbound(streamId, headers, endOfStream); - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, - Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endOfStream) - throws Http2Exception { - onHeadersRead(ctx, streamId, headers, padding, endOfStream); - } - - @Override - public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, - int padding, boolean endOfStream) throws Http2Exception { - int pendingBytes = data.readableBytes() + padding; - writeInbound(streamId, data.retain(), endOfStream); - return pendingBytes; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java new file mode 100644 index 0000000..cda447b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.concurrent.Promise; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used by {@link Http2StreamBootstrap} to establish an + * {@link Http2StreamChannel}. + */ +@InterfaceAudience.Private +class StartHttp2StreamRequest { + + final Http2Headers headers; + + final ByteBuf data; + + final boolean endStream; + + final Promise<Http2StreamChannel> promise; + + StartHttp2StreamRequest(Http2Headers headers, ByteBuf data, + boolean endStream, Promise<Http2StreamChannel> promise) { + this.headers = headers; + this.data = data; + this.endStream = endStream; + this.promise = promise; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java deleted file mode 100644 index 1e1acdd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode.web.dtp; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http2.HttpUtil; -import io.netty.util.concurrent.Promise; - -import java.util.concurrent.ConcurrentMap; - -import net.sf.ehcache.store.chm.ConcurrentHashMap; - -public class Http2ResponseHandler extends - SimpleChannelInboundHandler<FullHttpResponse> { - - private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise = - new ConcurrentHashMap<>(); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) - throws Exception { - Integer streamId = - msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text()); - if (streamId == null) { - System.err.println("HttpResponseHandler unexpected message received: " - + msg); - return; - } - if (streamId.intValue() == 1) { - // this is the upgrade response message, just ignore it. - return; - } - Promise<FullHttpResponse> promise = streamId2Promise.get(streamId); - if (promise == null) { - System.err.println("Message received for unknown stream id " + streamId); - } else { - // Do stuff with the message (for now just print it) - promise.setSuccess(msg.retain()); - - } - } - - public void put(Integer streamId, Promise<FullHttpResponse> promise) { - streamId2Promise.put(streamId, promise); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java index eaa63a4..4d4ac37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java @@ -19,35 +19,16 @@ package org.apache.hadoop.hdfs.server.datanode.web.dtp; import static org.junit.Assert.assertEquals; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2FrameReader; -import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2ConnectionHandler; -import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2FrameReader; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFrameLogger; -import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; -import io.netty.handler.codec.http2.HttpUtil; -import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter; -import io.netty.handler.logging.LogLevel; +import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.timeout.TimeoutException; -import io.netty.util.concurrent.Promise; +import io.netty.util.AsciiString; import java.io.IOException; import java.net.URISyntaxException; @@ -57,14 +38,17 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.hadoop.hdfs.web.http2.ClientHttp2ConnectionHandler; +import org.apache.hadoop.hdfs.web.http2.Http2DataReceiver; +import org.apache.hadoop.hdfs.web.http2.Http2StreamBootstrap; +import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -public class TestDtpHttp2 { +import com.google.common.io.ByteStreams; - private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger( - LogLevel.INFO, TestDtpHttp2.class); +public class TestDtpHttp2 { private static final Configuration CONF = WebHdfsTestUtil.createConf(); @@ -74,16 +58,15 @@ public class TestDtpHttp2 { private static Channel CHANNEL; - private static Http2ResponseHandler RESPONSE_HANDLER; + private static Http2StreamChannel STREAM; @BeforeClass public static void setUp() throws IOException, URISyntaxException, - TimeoutException { + TimeoutException, InterruptedException, ExecutionException { CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build(); CLUSTER.waitActive(); - RESPONSE_HANDLER = new Http2ResponseHandler(); - Bootstrap bootstrap = + CHANNEL = new Bootstrap() .group(WORKER_GROUP) .channel(NioSocketChannel.class) @@ -93,22 +76,31 @@ public class TestDtpHttp2 { @Override protected void initChannel(Channel ch) throws Exception { - Http2Connection connection = new DefaultHttp2Connection(false); - Http2ConnectionHandler connectionHandler = - new HttpToHttp2ConnectionHandler(connection, frameReader(), - frameWriter(), new DelegatingDecompressorFrameListener( - connection, new InboundHttp2ToHttpAdapter.Builder( - connection).maxContentLength(Integer.MAX_VALUE) - .propagateSettings(true).build())); - ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER); + ch.pipeline().addLast( + ClientHttp2ConnectionHandler.create(ch, CONF)); } - }); - CHANNEL = bootstrap.connect().syncUninterruptibly().channel(); + }).connect().syncUninterruptibly().channel(); + STREAM = + new Http2StreamBootstrap() + .channel(CHANNEL) + .headers( + new DefaultHttp2Headers().method( + new AsciiString(HttpMethod.GET.name())).path( + new AsciiString("/"))).endStream(true) + .handler(new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new Http2DataReceiver()); + } + }).connect().syncUninterruptibly().get(); } @AfterClass public static void tearDown() throws IOException { + if (STREAM != null) { + STREAM.close(); + } if (CHANNEL != null) { CHANNEL.close().syncUninterruptibly(); } @@ -118,28 +110,14 @@ public class TestDtpHttp2 { } } - private static Http2FrameReader frameReader() { - return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), - FRAME_LOGGER); - } - - private static Http2FrameWriter frameWriter() { - return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), - FRAME_LOGGER); - } - @Test - public void test() throws InterruptedException, ExecutionException { - int streamId = 3; - FullHttpRequest request = - new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(), - streamId); - Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise(); - RESPONSE_HANDLER.put(streamId, promise); - CHANNEL.writeAndFlush(request); - assertEquals(HttpResponseStatus.OK, promise.get().status()); - ByteBuf content = promise.get().content(); - assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8)); + public void test() throws InterruptedException, ExecutionException, + IOException { + Http2DataReceiver receiver = STREAM.pipeline().get(Http2DataReceiver.class); + assertEquals(HttpResponseStatus.OK.codeAsText(), receiver.waitForResponse() + .status()); + assertEquals("HTTP/2 DTP", + new String(ByteStreams.toByteArray(receiver.content()), + StandardCharsets.UTF_8)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java new file mode 100644 index 0000000..d7c6d16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.util.ByteString; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; + +public abstract class AbstractTestHttp2Client { + + protected EventLoopGroup workerGroup = new NioEventLoopGroup(); + + protected Server server; + + protected final class EchoHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + byte[] msg = IOUtils.toByteArray(request.getInputStream()); + response.getOutputStream().write(msg); + response.getOutputStream().flush(); + } + + } + + protected Channel channel; + + protected void start() throws Exception { + server = new Server(); + ServerConnector connector = + new ServerConnector(server, new HTTP2CServerConnectionFactory( + new HttpConfiguration())); + connector.setPort(0); + server.addConnector(connector); + setHandler(server); + server.start(); + channel = + new Bootstrap() + .group(workerGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast( + ClientHttp2ConnectionHandler.create(ch, new Configuration())); + } + + }) + .connect( + new InetSocketAddress("127.0.0.1", connector.getLocalPort())) + .sync().channel(); + } + + protected void stop() throws Exception { + if (channel != null) { + channel.close(); + } + if (server != null) { + server.stop(); + } + workerGroup.shutdownGracefully(); + } + + protected Http2StreamChannel connect(boolean endStream) + throws InterruptedException, ExecutionException { + return new Http2StreamBootstrap() + .channel(channel) + .handler(new ChannelInitializer<Http2StreamChannel>() { + + @Override + protected void initChannel(Http2StreamChannel ch) throws Exception { + ch.pipeline().addLast(new Http2DataReceiver()); + } + + }) + .headers( + new DefaultHttp2Headers() + .method( + new ByteString(HttpMethod.GET.name(), StandardCharsets.UTF_8)) + .path(new ByteString("/", StandardCharsets.UTF_8)) + .scheme(new ByteString("http", StandardCharsets.UTF_8)) + .authority( + new ByteString("127.0.0.1:" + + ((InetSocketAddress) channel.remoteAddress()).getPort(), + StandardCharsets.UTF_8))).endStream(endStream).connect() + .sync().get(); + } + + protected abstract void setHandler(Server server); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java new file mode 100644 index 0000000..ab3f6d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import static org.junit.Assert.assertEquals; +import io.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; + +import org.eclipse.jetty.server.Server; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.ByteStreams; + +public class TestHttp2Client extends AbstractTestHttp2Client { + + @Before + public void setUp() throws Exception { + start(); + } + + @After + public void tearDown() throws Exception { + stop(); + } + + @Test + public void test() throws InterruptedException, ExecutionException, + IOException { + Http2StreamChannel stream = connect(false); + Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class); + stream.write(stream.alloc().buffer() + .writeBytes("Hello World".getBytes(StandardCharsets.UTF_8))); + stream.writeAndFlush(LastHttp2Message.get()); + assertEquals(receiver.waitForResponse().status(), + HttpResponseStatus.OK.codeAsText()); + assertEquals("Hello World", + new String(ByteStreams.toByteArray(receiver.content()), + StandardCharsets.UTF_8)); + } + + @Override + protected void setHandler(Server server) { + server.setHandler(new EchoHandler()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c0e0313/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java new file mode 100644 index 0000000..1c483c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import io.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.server.Server; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class TestHttp2ClientMultiThread extends AbstractTestHttp2Client { + + private int requestCount = 10000; + + private int concurrency = 10; + + private ExecutorService executor = Executors.newFixedThreadPool(concurrency, + new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true) + .build()); + + @Before + public void setUp() throws Exception { + start(); + } + + @After + public void tearDown() throws Exception { + stop(); + } + + private void testEcho() throws InterruptedException, ExecutionException, + IOException { + Http2StreamChannel stream = connect(false); + Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class); + byte[] b = new byte[ThreadLocalRandom.current().nextInt(10, 100)]; + ThreadLocalRandom.current().nextBytes(b); + stream.write(stream.alloc().buffer(b.length).writeBytes(b)); + stream.writeAndFlush(LastHttp2Message.get()); + assertEquals(receiver.waitForResponse().status(), + HttpResponseStatus.OK.codeAsText()); + assertArrayEquals(b, ByteStreams.toByteArray(receiver.content())); + } + + @Test + public void test() throws InterruptedException { + final AtomicBoolean succ = new AtomicBoolean(true); + for (int i = 0; i < requestCount; i++) { + executor.execute(new Runnable() { + + @Override + public void run() { + try { + testEcho(); + } catch (Throwable t) { + t.printStackTrace(); + succ.set(false); + } + } + }); + } + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + assertTrue(succ.get()); + } + + @Override + protected void setHandler(Server server) { + server.setHandler(new EchoHandler()); + } +} \ No newline at end of file