Repository: flink
Updated Branches:
  refs/heads/master 3cfd1053c -> aa207ef31


[FLINK-1579] [refactor] Add WebFrontendBootstrap for code reuse

Refactors the web frontend's Netty code into a bootstrap helper class
that can be re-used by the history server.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e85f01d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e85f01d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e85f01d

Branch: refs/heads/master
Commit: 8e85f01de4e01752716d6498c7921e0f24ae8482
Parents: 3cfd105
Author: zentol <[email protected]>
Authored: Mon Mar 20 18:41:26 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Mar 22 15:44:59 2017 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  93 +-----------
 .../webmonitor/utils/WebFrontendBootstrap.java  | 146 +++++++++++++++++++
 2 files changed, 152 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e85f01d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index d88fdcf..39bca71 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -20,18 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
@@ -84,6 +73,7 @@ import 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 
 import org.slf4j.Logger;
@@ -95,10 +85,8 @@ import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
@@ -137,13 +125,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private final SSLContext serverSSLContext;
 
-       private final ServerBootstrap bootstrap;
-
        private final Promise<String> jobManagerAddressPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
        private final FiniteDuration timeout;
-
-       private Channel serverChannel;
+       private final WebFrontendBootstrap netty;
 
        private final File webRootDir;
 
@@ -380,52 +365,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        LOG.warn("Error while adding shutdown hook", t);
                }
 
-               final Configuration sslConfig = config;
-               ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
-
-                       @Override
-                       protected void initChannel(SocketChannel ch) {
-                               Handler handler = new Handler(router);
-
-                               // SSL should be the first handler in the 
pipeline
-                               if (serverSSLContext != null) {
-                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
-                                       
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
-                                       sslEngine.setUseClientMode(false);
-                                       ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
-                               }
-
-                               ch.pipeline()
-                                               .addLast(new HttpServerCodec())
-                                               .addLast(new 
ChunkedWriteHandler())
-                                               .addLast(new 
HttpRequestHandler(uploadDir))
-                                               .addLast(handler.name(), 
handler)
-                                               .addLast(new 
PipelineErrorHandler(LOG));
-                       }
-               };
-
-               NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
-               NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-
-               this.bootstrap = new ServerBootstrap();
-               this.bootstrap
-                               .group(bossGroup, workerGroup)
-                               .channel(NioServerSocketChannel.class)
-                               .childHandler(initializer);
-
-               ChannelFuture ch;
-               if (configuredAddress == null) {
-                       ch = this.bootstrap.bind(configuredPort);
-               } else {
-                       ch = this.bootstrap.bind(configuredAddress, 
configuredPort);
-               }
-               this.serverChannel = ch.sync().channel();
-
-               InetSocketAddress bindAddress = (InetSocketAddress) 
serverChannel.localAddress();
-               String address = bindAddress.getAddress().getHostAddress();
-               int port = bindAddress.getPort();
-
-               LOG.info("Web frontend listening at " + address + ':' + port);
+               this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, 
serverSSLContext, configuredAddress, configuredPort, config);
        }
 
        /**
@@ -482,7 +422,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        // this here repeatedly, because cache clean up only 
happens on
                        // interactions with the cache. We need it to make sure 
that we
                        // don't leak memory after completed jobs or long ago 
accessed stats.
-                       bootstrap.childGroup().scheduleWithFixedDelay(new 
Runnable() {
+                       
netty.getBootstrap().childGroup().scheduleWithFixedDelay(new Runnable() {
                                @Override
                                public void run() {
                                        try {
@@ -500,18 +440,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                synchronized (startupShutdownLock) {
                        leaderRetrievalService.stop();
 
-                       if (this.serverChannel != null) {
-                               
this.serverChannel.close().awaitUninterruptibly();
-                               this.serverChannel = null;
-                       }
-                       if (bootstrap != null) {
-                               if (bootstrap.group() != null) {
-                                       bootstrap.group().shutdownGracefully();
-                               }
-                               if (bootstrap.childGroup() != null) {
-                                       
bootstrap.childGroup().shutdownGracefully();
-                               }
-                       }
+                       netty.shutdown();
 
                        stackTraceSamples.shutDown();
 
@@ -525,17 +454,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        @Override
        public int getServerPort() {
-               Channel server = this.serverChannel;
-               if (server != null) {
-                       try {
-                               return ((InetSocketAddress) 
server.localAddress()).getPort();
-                       }
-                       catch (Exception e) {
-                               LOG.error("Cannot access local server port", e);
-                       }
-               }
-
-               return -1;
+               return netty.getServerPort();
        }
 
        private void cleanup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e85f01d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
new file mode 100644
index 0000000..19ec08a
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
+import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.File;
+import java.net.InetSocketAddress;
+
+/**
+ * This classes encapsulates the boot-strapping of netty for the web-frontend.
+ */
+public class WebFrontendBootstrap {
+       private final Router router;
+       private final Logger log;
+       private final File uploadDir;
+       private final SSLContext serverSSLContext;
+       private final ServerBootstrap bootstrap;
+       private final Channel serverChannel;
+
+       public WebFrontendBootstrap(
+                       Router router,
+                       Logger log,
+                       File directory,
+                       SSLContext sslContext,
+                       String configuredAddress,
+                       int configuredPort,
+                       final Configuration config) throws InterruptedException 
{
+               this.router = Preconditions.checkNotNull(router);
+               this.log = Preconditions.checkNotNull(log);
+               this.uploadDir = Preconditions.checkNotNull(directory);
+               this.serverSSLContext = sslContext;
+
+               ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
+
+                       @Override
+                       protected void initChannel(SocketChannel ch) {
+                               Handler handler = new 
Handler(WebFrontendBootstrap.this.router);
+
+                               // SSL should be the first handler in the 
pipeline
+                               if (serverSSLContext != null) {
+                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       
SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+                                       sslEngine.setUseClientMode(false);
+                                       ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                               }
+
+                               ch.pipeline()
+                                       .addLast(new HttpServerCodec())
+                                       .addLast(new ChunkedWriteHandler())
+                                       .addLast(new 
HttpRequestHandler(uploadDir))
+                                       .addLast(handler.name(), handler)
+                                       .addLast(new 
PipelineErrorHandler(WebFrontendBootstrap.this.log));
+                       }
+               };
+
+               NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
+               NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+               this.bootstrap = new ServerBootstrap();
+               this.bootstrap
+                       .group(bossGroup, workerGroup)
+                       .channel(NioServerSocketChannel.class)
+                       .childHandler(initializer);
+
+               ChannelFuture ch;
+               if (configuredAddress == null) {
+                       ch = this.bootstrap.bind(configuredPort);
+               } else {
+                       ch = this.bootstrap.bind(configuredAddress, 
configuredPort);
+               }
+               this.serverChannel = ch.sync().channel();
+
+               InetSocketAddress bindAddress = (InetSocketAddress) 
serverChannel.localAddress();
+               String address = bindAddress.getAddress().getHostAddress();
+               int port = bindAddress.getPort();
+
+               this.log.info("Web frontend listening at {}" + ':' + "{}", 
address, port);
+       }
+
+       public ServerBootstrap getBootstrap() {
+               return bootstrap;
+       }
+       
+       public int getServerPort() {
+               Channel server = this.serverChannel;
+               if (server != null) {
+                       try {
+                               return ((InetSocketAddress) 
server.localAddress()).getPort();
+                       }
+                       catch (Exception e) {
+                               log.error("Cannot access local server port", e);
+                       }
+               }
+
+               return -1;
+       }
+       
+       public void shutdown() {
+               if (this.serverChannel != null) {
+                       this.serverChannel.close().awaitUninterruptibly();
+               }
+               if (bootstrap != null) {
+                       if (bootstrap.group() != null) {
+                               bootstrap.group().shutdownGracefully();
+                       }
+                       if (bootstrap.childGroup() != null) {
+                               bootstrap.childGroup().shutdownGracefully();
+                       }
+               }
+       }
+}

Reply via email to