HDFS-8515. Implement HTTP/2 stream channels. Contributed by Duo Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c24a64fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c24a64fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c24a64fd Branch: refs/heads/HDFS-7966 Commit: c24a64fd91c6c9435c86c6cc95066b40c6f94cd0 Parents: 58590fe Author: Haohui Mai <whe...@apache.org> Authored: Wed Jun 24 18:07:17 2015 -0700 Committer: zhangduo <zhang...@wandoujia.com> Committed: Sat Oct 17 11:15:03 2015 +0800 ---------------------------------------------------------------------- .gitignore | 2 + .../hadoop-hdfs/CHANGES-HDFS-7966.txt | 3 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 + .../web/PortUnificationServerHandler.java | 27 +- .../datanode/web/dtp/DtpChannelHandler.java | 47 ++++ .../datanode/web/dtp/DtpHttp2FrameListener.java | 52 ---- .../datanode/web/dtp/DtpHttp2Handler.java | 34 --- .../hdfs/web/http2/Http2StreamChannel.java | 268 +++++++++++++++++++ .../hadoop/hdfs/web/http2/LastHttp2Message.java | 44 +++ .../web/http2/ServerHttp2ConnectionHandler.java | 86 ++++++ .../web/http2/ServerHttp2EventListener.java | 135 ++++++++++ .../datanode/web/dtp/Http2ResponseHandler.java | 14 +- .../server/datanode/web/dtp/TestDtpHttp2.java | 6 +- .../hdfs/web/http2/AbstractTestHttp2Server.java | 67 +++++ .../hadoop/hdfs/web/http2/StreamListener.java | 116 ++++++++ .../hadoop/hdfs/web/http2/TestHttp2Server.java | 140 ++++++++++ .../web/http2/TestHttp2ServerMultiThread.java | 207 ++++++++++++++ hadoop-project/pom.xml | 7 + 18 files changed, 1154 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index cde198e..0d664ba 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ yarnregistry.pdf hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml patchprocess/ +hadoop-hdfs-project/hadoop-hdfs/src/test/resources/common-version-info.properties +hadoop-hdfs-project/hadoop-hdfs/src/test/resources/webapps http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt new file mode 100644 index 0000000..4ec2793 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt @@ -0,0 +1,3 @@ +HDFS-7966 AND RELATED CHANGES + + HDFS-8515. Implement HTTP/2 stream channels. (Duo Zhang via wheat9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 0798248..2cb1716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -194,6 +194,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>htrace-core4</artifactId> </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-client</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-kms</artifactId> <classifier>classes</classifier> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java index 7ebc070..e5c5256 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java @@ -17,21 +17,24 @@ */ package org.apache.hadoop.hdfs.server.datanode.web; -import java.net.InetSocketAddress; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpChannelHandler; +import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel; +import org.apache.hadoop.hdfs.web.http2.ServerHttp2ConnectionHandler; + /** * A port unification handler to support HTTP/1.1 and HTTP/2 on the same port. */ @@ -64,7 +67,15 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder { } private void configureHttp2(ChannelHandlerContext ctx) { - ctx.pipeline().addLast(new DtpHttp2Handler()); + ctx.pipeline().addLast( + ServerHttp2ConnectionHandler.create(ctx.channel(), + new ChannelInitializer<Http2StreamChannel>() { + + @Override + protected void initChannel(Http2StreamChannel ch) throws Exception { + ch.pipeline().addLast(new DtpChannelHandler()); + } + })); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java new file mode 100644 index 0000000..23847c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.dtp; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; + +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.web.http2.LastHttp2Message; + +/** + * A dummy handler that just write back a string message. + */ +@InterfaceAudience.Private +public class DtpChannelHandler extends + SimpleChannelInboundHandler<Http2Headers> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg) + throws Exception { + ctx.write(new DefaultHttp2Headers().status(HttpResponseStatus.OK + .codeAsText())); + ctx.write(ctx.alloc().buffer() + .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8))); + ctx.writeAndFlush(LastHttp2Message.get()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java deleted file mode 100644 index 41e7cf4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode.web.dtp; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.Http2ConnectionEncoder; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2FrameAdapter; -import io.netty.handler.codec.http2.Http2Headers; - -import java.nio.charset.StandardCharsets; - -class DtpHttp2FrameListener extends Http2FrameAdapter { - - private Http2ConnectionEncoder encoder; - - public void encoder(Http2ConnectionEncoder encoder) { - this.encoder = encoder; - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, - Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endStream) throws Http2Exception { - encoder.writeHeaders(ctx, streamId, - new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0, - false, ctx.newPromise()); - encoder.writeData( - ctx, - streamId, - ctx.alloc().buffer() - .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true, - ctx.newPromise()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java deleted file mode 100644 index 5b6f279..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode.web.dtp; - -import org.apache.hadoop.classification.InterfaceAudience; - -import io.netty.handler.codec.http2.Http2ConnectionHandler; - -/** - * The HTTP/2 handler. - */ -@InterfaceAudience.Private -public class DtpHttp2Handler extends Http2ConnectionHandler { - - public DtpHttp2Handler() { - super(true, new DtpHttp2FrameListener()); - ((DtpHttp2FrameListener) decoder().listener()).encoder(encoder()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java new file mode 100644 index 0000000..658ffe4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.UnsupportedMessageTypeException; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.internal.InternalThreadLocalMap; + +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.collect.ImmutableSet; + +/** + * A channel used for modeling an HTTP/2 stream. + * <p> + * We share the same event loop with the parent channel, so doBeginRead, doWrite + * and doClose will run in the same event loop thread. So no event loop + * switching is needed, and it is safe to call encoder.writeXXX directly in + * doWrite. + * <p> + * But the public methods(isOpen, isActive...) can be called outside the event + * loop, so the state field must be volatile. + */ +@InterfaceAudience.Private +public class Http2StreamChannel extends AbstractChannel { + + private static final ChannelMetadata METADATA = new ChannelMetadata(false); + + private static final int MAX_READER_STACK_DEPTH = 8; + + private final ChannelHandlerContext http2ConnHandlerCtx; + private final Http2Stream stream; + private final Http2ConnectionEncoder encoder; + private final DefaultChannelConfig config; + private final Queue<Object> inboundMessageQueue = new ArrayDeque<>(); + + private enum State { + OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED + } + + private volatile State state = State.OPEN; + + public Http2StreamChannel(Channel parent, Http2Stream stream) { + super(parent); + this.http2ConnHandlerCtx = + parent.pipeline().context(Http2ConnectionHandler.class); + Http2ConnectionHandler connHandler = + (Http2ConnectionHandler) http2ConnHandlerCtx.handler(); + this.stream = stream; + this.encoder = connHandler.encoder(); + this.config = new DefaultChannelConfig(this); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return state != State.CLOSED; + } + + @Override + public boolean isActive() { + // we create this channel after HTTP/2 stream active, so we do not have a + // separated 'active' state. + return isOpen(); + } + + @Override + public ChannelMetadata metadata() { + return METADATA; + } + + private final class Http2Unsafe extends AbstractUnsafe { + + @Override + public void connect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) { + throw new UnsupportedOperationException(); + } + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new Http2Unsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return true; + } + + @Override + protected SocketAddress localAddress0() { + return parent().localAddress(); + } + + @Override + protected SocketAddress remoteAddress0() { + return parent().remoteAddress(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() throws Exception { + if (stream.state() != Http2Stream.State.CLOSED) { + encoder.writeRstStream(http2ConnHandlerCtx, stream.id(), + Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise()); + } + state = State.CLOSED; + } + + private final Runnable readTask = new Runnable() { + + @Override + public void run() { + ChannelPipeline pipeline = pipeline(); + int maxMessagesPerRead = config().getMaxMessagesPerRead(); + for (int i = 0; i < maxMessagesPerRead; i++) { + Object m = inboundMessageQueue.poll(); + if (m == null) { + break; + } + if (m == LastHttp2Message.get()) { + state = + state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED + : State.HALF_CLOSED_REMOTE; + } + pipeline.fireChannelRead(m); + } + pipeline.fireChannelReadComplete(); + } + }; + + @Override + protected void doBeginRead() throws Exception { + State currentState = this.state; + if (currentState == State.CLOSED) { + throw new ClosedChannelException(); + } + if (inboundMessageQueue.isEmpty()) { + return; + } + final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + final Integer stackDepth = threadLocals.localChannelReaderStackDepth(); + if (stackDepth < MAX_READER_STACK_DEPTH) { + threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1); + try { + readTask.run(); + } finally { + threadLocals.setLocalChannelReaderStackDepth(stackDepth); + } + } else { + eventLoop().execute(readTask); + } + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + State currentState = this.state; + if (currentState == State.CLOSED) { + throw new ClosedChannelException(); + } + boolean flush = false; + for (;;) { + Object msg = in.current(); + if (msg == null) { + break; + } + if (msg == LastHttp2Message.get()) { + this.state = + currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED + : State.HALF_CLOSED_LOCAL; + encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx + .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise()); + } else if (msg instanceof Http2Headers) { + encoder.writeHeaders(http2ConnHandlerCtx, stream.id(), + (Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise()); + } else if (msg instanceof ByteBuf) { + ByteBuf data = (ByteBuf) msg; + encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0, + false, http2ConnHandlerCtx.newPromise()); + } else { + throw new UnsupportedMessageTypeException(msg, Http2Headers.class, + ByteBuf.class); + } + in.remove(); + flush = true; + } + if (flush) { + http2ConnHandlerCtx.channel().flush(); + } + } + + /** + * Append a message to the inbound queue of this channel. You need to call + * {@link #read()} if you want to pass the message to handlers. + */ + void writeInbound(Object msg) { + inboundMessageQueue.add(msg); + } + + private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES = + ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED); + + /** + * @return true if remote side finishes sending data to us. + */ + public boolean remoteSideClosed() { + return REMOTE_SIDE_CLOSED_STATES.contains(state); + } + + private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES = + ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED); + + /** + * @return true if we finish sending data to remote side. + */ + public boolean localSideClosed() { + return LOCAL_SIDE_CLOSED_STATES.contains(state); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java new file mode 100644 index 0000000..b72b09a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.handler.codec.http.LastHttpContent; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used to tell an inbound handler that the remote side of an HTTP/2 stream is + * closed, or used by an outbound handler to tell the HTTP/2 stream to close + * local side. + * @see LastHttpContent#EMPTY_LAST_CONTENT + */ +@InterfaceAudience.Private +public final class LastHttp2Message { + + private static final LastHttp2Message INSTANCE = new LastHttp2Message(); + + private LastHttp2Message() { + } + + /** + * Get the singleton <tt>LastHttp2Message</tt> instance. + */ + public static LastHttp2Message get() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java new file mode 100644 index 0000000..1ee733d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2FrameListener; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameReader; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; +import io.netty.handler.logging.LogLevel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An {@link Http2ConnectionHandler} used at server side. + */ +@InterfaceAudience.Private +public class ServerHttp2ConnectionHandler extends Http2ConnectionHandler { + + private static final Log LOG = LogFactory + .getLog(ServerHttp2ConnectionHandler.class); + + private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger( + LogLevel.INFO, ServerHttp2ConnectionHandler.class); + + private ServerHttp2ConnectionHandler(Http2Connection connection, + Http2FrameReader frameReader, Http2FrameWriter frameWriter, + Http2FrameListener listener) { + super(connection, frameReader, frameWriter, listener); + } + + /** + * Create and initialize an {@link ServerHttp2ConnectionHandler}. + * @param channel + * @param initializer + * @param verbose whether to log inbound and outbound HTTP/2 messages + * @return the initialized {@link ServerHttp2ConnectionHandler} + */ + public static ServerHttp2ConnectionHandler create(Channel channel, + ChannelInitializer<Http2StreamChannel> initializer) { + Http2Connection conn = new DefaultHttp2Connection(true); + ServerHttp2EventListener listener = + new ServerHttp2EventListener(channel, conn, initializer); + conn.addListener(listener); + Http2FrameReader frameReader; + Http2FrameWriter frameWriter; + if (LOG.isDebugEnabled()) { + frameReader = + new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), + FRAME_LOGGER); + frameWriter = + new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), + FRAME_LOGGER); + } else { + frameReader = new DefaultHttp2FrameReader(); + frameWriter = new DefaultHttp2FrameWriter(); + } + return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter, + listener); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java new file mode 100644 index 0000000..72e3879 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2Connection.PropertyKey; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2EventAdapter; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An HTTP/2 FrameListener and EventListener to manage + * {@link Http2StreamChannel}s. + * <p> + * We do not handle onRstStreamRead here, a stream that being reset will also + * call onStreamClosed. The upper layer should not rely on a reset event. + */ +@InterfaceAudience.Private +public class ServerHttp2EventListener extends Http2EventAdapter { + + private final Channel parentChannel; + + private final ChannelInitializer<Http2StreamChannel> subChannelInitializer; + + private final Http2Connection conn; + + private final PropertyKey subChannelPropKey; + + public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn, + ChannelInitializer<Http2StreamChannel> subChannelInitializer) { + this.parentChannel = parentChannel; + this.conn = conn; + this.subChannelInitializer = subChannelInitializer; + this.subChannelPropKey = conn.newKey(); + } + + @Override + public void onStreamActive(final Http2Stream stream) { + Http2StreamChannel subChannel = + new Http2StreamChannel(parentChannel, stream); + stream.setProperty(subChannelPropKey, subChannel); + subChannel.pipeline().addFirst(subChannelInitializer); + parentChannel.eventLoop().register(subChannel) + .addListener(new FutureListener<Void>() { + + @Override + public void operationComplete(Future<Void> future) throws Exception { + if (!future.isSuccess()) { + stream.removeProperty(subChannelPropKey); + } + } + + }); + + } + + @Override + public void onStreamClosed(Http2Stream stream) { + Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey); + if (subChannel != null) { + subChannel.close(); + } + } + + private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception { + Http2StreamChannel subChannel = + conn.stream(streamId).getProperty(subChannelPropKey); + if (subChannel == null) { + throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, + "No sub channel found"); + } + return subChannel; + } + + private void writeInbound(int streamId, Object msg, boolean endOfStream) + throws Http2Exception { + Http2StreamChannel subChannel = getSubChannel(streamId); + subChannel.writeInbound(msg); + if (endOfStream) { + subChannel.writeInbound(LastHttp2Message.get()); + } + if (subChannel.config().isAutoRead()) { + subChannel.read(); + } + + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endOfStream) + throws Http2Exception { + writeInbound(streamId, headers, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, + boolean exclusive, int padding, boolean endOfStream) + throws Http2Exception { + onHeadersRead(ctx, streamId, headers, padding, endOfStream); + } + + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endOfStream) throws Http2Exception { + int pendingBytes = data.readableBytes() + padding; + writeInbound(streamId, data.retain(), endOfStream); + return pendingBytes; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java index eb8b918..1e1acdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java @@ -23,14 +23,15 @@ import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http2.HttpUtil; import io.netty.util.concurrent.Promise; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import net.sf.ehcache.store.chm.ConcurrentHashMap; public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { - private Map<Integer, Promise<FullHttpResponse>> streamId2Promise = - new HashMap<>(); + private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise = + new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) @@ -46,10 +47,7 @@ public class Http2ResponseHandler extends // this is the upgrade response message, just ignore it. return; } - Promise<FullHttpResponse> promise; - synchronized (this) { - promise = streamId2Promise.get(streamId); - } + Promise<FullHttpResponse> promise = streamId2Promise.get(streamId); if (promise == null) { System.err.println("Message received for unknown stream id " + streamId); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java index 4e91004..eaa63a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java @@ -136,10 +136,8 @@ public class TestDtpHttp2 { request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(), streamId); Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise(); - synchronized (RESPONSE_HANDLER) { - CHANNEL.writeAndFlush(request); - RESPONSE_HANDLER.put(streamId, promise); - } + RESPONSE_HANDLER.put(streamId, promise); + CHANNEL.writeAndFlush(request); assertEquals(HttpResponseStatus.OK, promise.get().status()); ByteBuf content = promise.get().content(); assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java new file mode 100644 index 0000000..5f298f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import java.net.InetSocketAddress; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; + +public abstract class AbstractTestHttp2Server { + + protected EventLoopGroup bossGroup = new NioEventLoopGroup(1); + + protected EventLoopGroup workerGroup = new NioEventLoopGroup(); + + protected Channel server; + + protected HTTP2Client client = new HTTP2Client(); + + protected Session session; + + protected abstract Channel initServer(); + + protected final void start() throws Exception { + server = initServer(); + client.start(); + int port = ((InetSocketAddress) server.localAddress()).getPort(); + FuturePromise<Session> sessionPromise = new FuturePromise<>(); + client.connect(new InetSocketAddress("127.0.0.1", port), + new Session.Listener.Adapter(), sessionPromise); + session = sessionPromise.get(); + } + + protected final void stop() throws Exception { + if (session != null) { + session.close(ErrorCode.NO_ERROR.code, "", new Callback.Adapter()); + } + if (server != null) { + server.close(); + } + client.stop(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java new file mode 100644 index 0000000..7194490 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import java.io.IOException; +import java.util.Arrays; + +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http.MetaData.Response; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.Callback; + +public class StreamListener extends Stream.Listener.Adapter { + + private boolean finish = false; + + private byte[] buf = new byte[0]; + + private int status = -1; + + private boolean reset; + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) { + synchronized (this) { + if (reset) { + callback.failed(new IllegalStateException("Stream already closed")); + } + if (status == -1) { + callback + .failed(new IllegalStateException("Haven't received header yet")); + } + int bufLen = buf.length; + int newBufLen = bufLen + frame.getData().remaining(); + buf = Arrays.copyOf(buf, newBufLen); + frame.getData().get(buf, bufLen, frame.getData().remaining()); + if (frame.isEndStream()) { + finish = true; + } + notifyAll(); + callback.succeeded(); + } + } + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) { + synchronized (this) { + if (reset) { + throw new IllegalStateException("Stream already closed"); + } + if (status != -1) { + throw new IllegalStateException("Header already received"); + } + MetaData meta = frame.getMetaData(); + if (!meta.isResponse()) { + throw new IllegalStateException("Received non-response header"); + } + status = ((Response) meta).getStatus(); + if (frame.isEndStream()) { + finish = true; + notifyAll(); + } + } + } + + @Override + public void onReset(Stream stream, ResetFrame frame) { + synchronized (this) { + reset = true; + finish = true; + notifyAll(); + } + } + + public int getStatus() throws InterruptedException, IOException { + synchronized (this) { + while (!finish) { + wait(); + } + if (reset) { + throw new IOException("Stream reset"); + } + return status; + } + } + + public byte[] getData() throws InterruptedException, IOException { + synchronized (this) { + while (!finish) { + wait(); + } + if (reset) { + throw new IOException("Stream reset"); + } + return buf; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java new file mode 100644 index 0000000..6a8495b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import static org.junit.Assert.assertEquals; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.ReferenceCountUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PriorityFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHttp2Server extends AbstractTestHttp2Server { + + private final AtomicInteger handlerClosedCount = new AtomicInteger(0); + + private final class HelloWorldHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg instanceof Http2Headers) { + ctx.writeAndFlush(new DefaultHttp2Headers() + .status(HttpResponseStatus.OK.codeAsText())); + } else { + ctx.writeAndFlush(ReferenceCountUtil.retain(msg)); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + handlerClosedCount.incrementAndGet(); + } + + } + + @Override + protected Channel initServer() { + return new ServerBootstrap().group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast( + ServerHttp2ConnectionHandler.create(ch, + new ChannelInitializer<Http2StreamChannel>() { + + @Override + protected void initChannel(Http2StreamChannel ch) + throws Exception { + ch.pipeline().addLast(new HelloWorldHandler()); + } + })); + } + + }).bind(0).syncUninterruptibly().channel(); + } + + @Before + public void setUp() throws Exception { + start(); + } + + @After + public void tearDown() throws Exception { + stop(); + } + + @Test + public void test() throws InterruptedException, ExecutionException, + IOException { + HttpFields fields = new HttpFields(); + fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString()); + fields.put(HttpHeader.C_PATH, "/"); + FuturePromise<Stream> streamPromise = new FuturePromise<>(); + StreamListener listener = new StreamListener(); + session.newStream(new HeadersFrame(1, new MetaData( + org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame( + 1, 0, 1, false), false), streamPromise, listener); + Stream stream = streamPromise.get(); + stream.data( + new DataFrame(stream.getId(), ByteBuffer.wrap("Hello World" + .getBytes(StandardCharsets.UTF_8)), true), new Callback.Adapter()); + assertEquals("Hello World", new String(listener.getData(), + StandardCharsets.UTF_8)); + + streamPromise = new FuturePromise<>(); + listener = new StreamListener(); + session.newStream(new HeadersFrame(1, new MetaData( + org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame( + 1, 0, 1, false), false), streamPromise, listener); + stream = streamPromise.get(); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code), + new Callback.Adapter()); + Thread.sleep(1000); + assertEquals(2, handlerClosedCount.get()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java new file mode 100644 index 0000000..e583ca3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.http2; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.ResourceLeakDetector.Level; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PriorityFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class TestHttp2ServerMultiThread extends AbstractTestHttp2Server { + + private final class DispatchHandler extends + SimpleChannelInboundHandler<Http2Headers> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg) + throws Exception { + ctx.writeAndFlush(new DefaultHttp2Headers().status(HttpResponseStatus.OK + .codeAsText())); + ctx.pipeline().remove(this) + .addLast(new EchoHandler(), new EndStreamHandler()); + } + } + + private final class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) + throws Exception { + ByteBuf out = msg.readBytes(msg.readableBytes()); + ctx.writeAndFlush(out); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + handlerClosedCount.incrementAndGet(); + } + + } + + private final class EndStreamHandler extends + SimpleChannelInboundHandler<LastHttp2Message> { + + @Override + protected void + channelRead0(ChannelHandlerContext ctx, LastHttp2Message msg) + throws Exception { + ctx.writeAndFlush(msg); + } + + } + + private final AtomicInteger handlerClosedCount = new AtomicInteger(0); + + private int concurrency = 10; + + private ExecutorService executor = Executors.newFixedThreadPool(concurrency, + new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true) + .build()); + + private int requestCount = 10000; + + @Override + protected Channel initServer() { + return new ServerBootstrap().group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast( + ServerHttp2ConnectionHandler.create(ch, + new ChannelInitializer<Http2StreamChannel>() { + + @Override + protected void initChannel(Http2StreamChannel ch) + throws Exception { + ch.pipeline().addLast(new DispatchHandler()); + } + })); + } + + }).bind(0).syncUninterruptibly().channel(); + } + + @Before + public void setUp() throws Exception { + ResourceLeakDetector.setLevel(Level.ADVANCED); + start(); + } + + @After + public void tearDown() throws Exception { + executor.shutdownNow(); + stop(); + } + + private void testEcho() throws InterruptedException, ExecutionException, + IOException { + HttpFields fields = new HttpFields(); + fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString()); + fields.put(HttpHeader.C_PATH, "/"); + FuturePromise<Stream> streamPromise = new FuturePromise<>(); + StreamListener listener = new StreamListener(); + session.newStream(new HeadersFrame(1, new MetaData( + org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame( + 1, 0, 1, false), false), streamPromise, listener); + Stream stream = streamPromise.get(); + if (ThreadLocalRandom.current().nextInt(5) < 1) { // 20% + stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code), + new Callback.Adapter()); + } else { + int numFrames = ThreadLocalRandom.current().nextInt(1, 3); + ByteArrayOutputStream msg = new ByteArrayOutputStream(); + for (int i = 0; i < numFrames; i++) { + byte[] frame = new byte[ThreadLocalRandom.current().nextInt(10, 100)]; + ThreadLocalRandom.current().nextBytes(frame); + stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(frame), + i == numFrames - 1), new Callback.Adapter()); + msg.write(frame); + } + assertEquals(HttpStatus.OK_200, listener.getStatus()); + assertArrayEquals(msg.toByteArray(), listener.getData()); + } + } + + @Test + public void test() throws InterruptedException { + final AtomicBoolean succ = new AtomicBoolean(true); + for (int i = 0; i < requestCount; i++) { + executor.execute(new Runnable() { + + @Override + public void run() { + try { + testEcho(); + } catch (Throwable t) { + t.printStackTrace(); + succ.set(false); + } + } + }); + } + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES)); + assertTrue(succ.get()); + Thread.sleep(1000); + assertEquals(requestCount, handlerClosedCount.get()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c24a64fd/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9724b18..e70acca 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -589,6 +589,13 @@ </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-client</artifactId> + <version>9.3.0.M2</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>com.twitter</groupId> <artifactId>hpack</artifactId> <version>0.11.0</version>