Repository: flink Updated Branches: refs/heads/master 3cfd1053c -> aa207ef31
[FLINK-1579] [refactor] Add WebFrontendBootstrap for code reuse Refactors the web frontend's Netty code into a bootstrap helper class that can be re-used by the history server. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e85f01d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e85f01d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e85f01d Branch: refs/heads/master Commit: 8e85f01de4e01752716d6498c7921e0f24ae8482 Parents: 3cfd105 Author: zentol <[email protected]> Authored: Mon Mar 20 18:41:26 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Mar 22 15:44:59 2017 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 93 +----------- .../webmonitor/utils/WebFrontendBootstrap.java | 146 +++++++++++++++++++ 2 files changed, 152 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e85f01d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index d88fdcf..39bca71 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -20,18 +20,7 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Router; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedWriteHandler; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; @@ -84,6 +73,7 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.slf4j.Logger; @@ -95,10 +85,8 @@ import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -137,13 +125,10 @@ public class WebRuntimeMonitor implements WebMonitor { private final SSLContext serverSSLContext; - private final ServerBootstrap bootstrap; - private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); private final FiniteDuration timeout; - - private Channel serverChannel; + private final WebFrontendBootstrap netty; private final File webRootDir; @@ -380,52 +365,7 @@ public class WebRuntimeMonitor implements WebMonitor { LOG.warn("Error while adding shutdown hook", t); } - final Configuration sslConfig = config; - ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { - - @Override - protected void initChannel(SocketChannel ch) { - Handler handler = new Handler(router); - - // SSL should be the first handler in the pipeline - if (serverSSLContext != null) { - SSLEngine sslEngine = serverSSLContext.createSSLEngine(); - SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); - sslEngine.setUseClientMode(false); - ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); - } - - ch.pipeline() - .addLast(new HttpServerCodec()) - .addLast(new ChunkedWriteHandler()) - .addLast(new HttpRequestHandler(uploadDir)) - .addLast(handler.name(), handler) - .addLast(new PipelineErrorHandler(LOG)); - } - }; - - NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); - NioEventLoopGroup workerGroup = new NioEventLoopGroup(); - - this.bootstrap = new ServerBootstrap(); - this.bootstrap - .group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(initializer); - - ChannelFuture ch; - if (configuredAddress == null) { - ch = this.bootstrap.bind(configuredPort); - } else { - ch = this.bootstrap.bind(configuredAddress, configuredPort); - } - this.serverChannel = ch.sync().channel(); - - InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); - String address = bindAddress.getAddress().getHostAddress(); - int port = bindAddress.getPort(); - - LOG.info("Web frontend listening at " + address + ':' + port); + this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config); } /** @@ -482,7 +422,7 @@ public class WebRuntimeMonitor implements WebMonitor { // this here repeatedly, because cache clean up only happens on // interactions with the cache. We need it to make sure that we // don't leak memory after completed jobs or long ago accessed stats. - bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() { + netty.getBootstrap().childGroup().scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { @@ -500,18 +440,7 @@ public class WebRuntimeMonitor implements WebMonitor { synchronized (startupShutdownLock) { leaderRetrievalService.stop(); - if (this.serverChannel != null) { - this.serverChannel.close().awaitUninterruptibly(); - this.serverChannel = null; - } - if (bootstrap != null) { - if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(); - } - if (bootstrap.childGroup() != null) { - bootstrap.childGroup().shutdownGracefully(); - } - } + netty.shutdown(); stackTraceSamples.shutDown(); @@ -525,17 +454,7 @@ public class WebRuntimeMonitor implements WebMonitor { @Override public int getServerPort() { - Channel server = this.serverChannel; - if (server != null) { - try { - return ((InetSocketAddress) server.localAddress()).getPort(); - } - catch (Exception e) { - LOG.error("Cannot access local server port", e); - } - } - - return -1; + return netty.getServerPort(); } private void cleanup() { http://git-wip-us.apache.org/repos/asf/flink/blob/8e85f01d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java new file mode 100644 index 0000000..19ec08a --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Router; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.webmonitor.HttpRequestHandler; +import org.apache.flink.runtime.webmonitor.PipelineErrorHandler; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.net.InetSocketAddress; + +/** + * This classes encapsulates the boot-strapping of netty for the web-frontend. + */ +public class WebFrontendBootstrap { + private final Router router; + private final Logger log; + private final File uploadDir; + private final SSLContext serverSSLContext; + private final ServerBootstrap bootstrap; + private final Channel serverChannel; + + public WebFrontendBootstrap( + Router router, + Logger log, + File directory, + SSLContext sslContext, + String configuredAddress, + int configuredPort, + final Configuration config) throws InterruptedException { + this.router = Preconditions.checkNotNull(router); + this.log = Preconditions.checkNotNull(log); + this.uploadDir = Preconditions.checkNotNull(directory); + this.serverSSLContext = sslContext; + + ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { + + @Override + protected void initChannel(SocketChannel ch) { + Handler handler = new Handler(WebFrontendBootstrap.this.router); + + // SSL should be the first handler in the pipeline + if (serverSSLContext != null) { + SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, config); + sslEngine.setUseClientMode(false); + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) + .addLast(new HttpRequestHandler(uploadDir)) + .addLast(handler.name(), handler) + .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log)); + } + }; + + NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + + this.bootstrap = new ServerBootstrap(); + this.bootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(initializer); + + ChannelFuture ch; + if (configuredAddress == null) { + ch = this.bootstrap.bind(configuredPort); + } else { + ch = this.bootstrap.bind(configuredAddress, configuredPort); + } + this.serverChannel = ch.sync().channel(); + + InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); + String address = bindAddress.getAddress().getHostAddress(); + int port = bindAddress.getPort(); + + this.log.info("Web frontend listening at {}" + ':' + "{}", address, port); + } + + public ServerBootstrap getBootstrap() { + return bootstrap; + } + + public int getServerPort() { + Channel server = this.serverChannel; + if (server != null) { + try { + return ((InetSocketAddress) server.localAddress()).getPort(); + } + catch (Exception e) { + log.error("Cannot access local server port", e); + } + } + + return -1; + } + + public void shutdown() { + if (this.serverChannel != null) { + this.serverChannel.close().awaitUninterruptibly(); + } + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + if (bootstrap.childGroup() != null) { + bootstrap.childGroup().shutdownGracefully(); + } + } + } +}
