[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424547#comment-15424547
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r75126311
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+ private final Router router;
+
+ private ServerBootstrap bootstrap;
+
+ private Channel serverChannel;
+
+ private URL baseURL;
+
+ public MesosArtifactServer(String sessionID, String serverHostname, int
configuredPort) throws Exception {
+ if (configuredPort < 0 || configuredPort > 0xFFFF) {
+ throw new IllegalArgumentException("File server port is
invalid: " + configuredPort);
+ }
+
+ router = new Router();
+
+ ChannelInitializer<SocketChannel> initializer = new
ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ Handler handler = new Handler(router);
+
+ ch.pipeline()
+ .addLast(new HttpServerCodec())
+ .addLast(handler.name(), handler)
+ .addLast(new UnknownFileHandler());
+ }
+ };
+
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ this.bootstrap = new ServerBootstrap();
+ this.bootstrap
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer);
+
+ Channel ch = this.bootstrap.bind(serverHostname,
configuredPort).sync().channel();
+ this.serverChannel = ch;
+
+ InetSocketAddress bindAddress = (InetSocketAddress)
ch.localAddress();
+ String address = bindAddress.getAddress().getHostAddress();
+ int port = bindAddress.getPort();
+
+ baseURL = new URL("http", serverHostname, port, "/" + sessionID
+ "/");
+
+ LOG.info("Mesos artifact server listening at " + address + ':'
+ port);
+ }
+
+ /**
+ * Get the server port on which the artifact server is listening.
+ */
+ public synchronized int getServerPort() {
+ Channel server = this.serverChannel;
+ if (server != null) {
+ try {
+ return ((InetSocketAddress)
server.localAddress()).getPort();
+ } catch (Exception e) {
+ LOG.error("Cannot access local server port", e);
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Adds a file to the artifact server.
+ * @param localFile the local file to serve.
+ * @param remoteFile the remote path with which to locate the file.
+ * @return the fully-qualified remote path to the file.
+ * @throws MalformedURLException if the remote path is invalid.
+ */
+ public synchronized URL addFile(File localFile, String remoteFile)
throws MalformedURLException {
+ URL fileURL = new URL(baseURL, remoteFile);
+ router.ANY(fileURL.getPath(), new
VirtualFileServerHandler(localFile));
+ return fileURL;
+ }
+
+ /**
+ * Stops the artifact server.
+ * @throws Exception
+ */
+ public synchronized void stop() throws Exception {
+ if (this.serverChannel != null) {
+ this.serverChannel.close().awaitUninterruptibly();
+ this.serverChannel = null;
+ }
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully();
+ }
+ bootstrap = null;
+ }
+ }
+
+ /**
+ * Handle HEAD and GET requests for a specific file.
+ */
+ @ChannelHandler.Sharable
+ public static class VirtualFileServerHandler extends
SimpleChannelInboundHandler<Routed> {
+
+ private File file;
--- End diff --
final
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)