[GitHub] flink issue #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/6031
  
> I planned to upgrade netty and drop netty-router in one step of upgrading 
flink-shaded-netty. Do you think it should be split somehow?
No, if the ticket for upgrading covers that, we are all good.

👍  to merge. Thanks for taking care of this. Looking forward to finally 
have a recent Netty version.


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190274800
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
+ *
+ * Router that doesn't contain information about HTTP request methods 
and route
+ * matching orders.
+ */
+final class MethodlessRouter {
+   private static final Logger log = 
LoggerFactory.getLogger(MethodlessRouter.class);
+
+   // A path pattern can only point to one target
+   private final Map<PathPattern, T> routes = new LinkedHashMap<>();
--- End diff --

Fine with me to not invest more time into this and keep it as is 👍 


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190274982
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class replaces the standard error response to be identical with 
those sent by the {@link AbstractRestHandler}.
+ */
+public class RouterHandler extends 
SimpleChannelInboundHandler {
+   public static final String ROUTER_HANDLER_NAME = 
RouterHandler.class.getName() + "_ROUTER_HANDLER";
+   public static final String ROUTED_HANDLER_NAME = 
RouterHandler.class.getName() + "_ROUTED_HANDLER";
+
+   private final Map<String, String> responseHeaders;
+   private final Router router;
+
+   public RouterHandler(Router router, final Map<String, String> 
responseHeaders) {
+   this.router = requireNonNull(router);
+   this.responseHeaders = requireNonNull(responseHeaders);
+   }
+
+   public String getName() {
+   return ROUTER_HANDLER_NAME;
+   }
+
+   @Override
+   protected void channelRead0(ChannelHandlerContext 
channelHandlerContext, HttpRequest httpRequest) throws Exception {
+   if (HttpHeaders.is100ContinueExpected(httpRequest)) {
+   channelHandlerContext.writeAndFlush(new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
+   return;
+   }
+
+   // Route
+   HttpMethod method = httpRequest.getMethod();
+   QueryStringDecoder qsd = new 
QueryStringDecoder(httpRequest.getUri());
+   RouteResult routeResult = router.route(method, qsd.path(), 
qsd.parameters());
+
+   if (routeResult == null) {
+   respondNotFound(channelHandlerContext, httpRequest);
+   return;
+   }
+
+   routed(channelHandlerContext, routeResult, httpRequest);
+   }
+
+   private void routed(
+   ChannelHandlerContext channelHandlerContext,
+   RouteResult routeResult,
+   HttpRequest httpRequest) throws Exception {
+   ChannelInboundHandler handler = (ChannelInboundHandler) 
routeResult.target();
+
+   // The handler may have been added (keep alive)
+   ChannelPipeline pipeline = channelHandlerContext.pipeline();
+   ChannelHandler addedHandler = pipeline.get(ROUTED_HANDLE

[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190214988
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class replaces the standard error response to be identical with 
those sent by the {@link AbstractRestHandler}.
+ */
+public class RouterHandler extends 
SimpleChannelInboundHandler {
+   public static final String ROUTER_HANDLER_NAME = 
RouterHandler.class.getName() + "_ROUTER_HANDLER";
+   public static final String ROUTED_HANDLER_NAME = 
RouterHandler.class.getName() + "_ROUTED_HANDLER";
+
+   private final Map<String, String> responseHeaders;
+   private final Router router;
+
+   public RouterHandler(Router router, final Map<String, String> 
responseHeaders) {
+   this.router = requireNonNull(router);
+   this.responseHeaders = requireNonNull(responseHeaders);
+   }
+
+   public String getName() {
+   return ROUTER_HANDLER_NAME;
+   }
+
+   @Override
+   protected void channelRead0(ChannelHandlerContext 
channelHandlerContext, HttpRequest httpRequest) throws Exception {
+   if (HttpHeaders.is100ContinueExpected(httpRequest)) {
+   channelHandlerContext.writeAndFlush(new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
+   return;
+   }
+
+   // Route
+   HttpMethod method = httpRequest.getMethod();
+   QueryStringDecoder qsd = new 
QueryStringDecoder(httpRequest.getUri());
+   RouteResult routeResult = router.route(method, qsd.path(), 
qsd.parameters());
+
+   if (routeResult == null) {
+   respondNotFound(channelHandlerContext, httpRequest);
+   return;
+   }
+
+   routed(channelHandlerContext, routeResult, httpRequest);
+   }
+
+   private void routed(
+   ChannelHandlerContext channelHandlerContext,
+   RouteResult routeResult,
+   HttpRequest httpRequest) throws Exception {
+   ChannelInboundHandler handler = (ChannelInboundHandler) 
routeResult.target();
+
+   // The handler may have been added (keep alive)
+   ChannelPipeline pipeline = channelHandlerContext.pipeline();
+   ChannelHandler addedHandler = pipeline.get(ROUTED_HANDLE

[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190248429
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
+ *
+ * Result of calling {@link Router#route(HttpMethod, String)}.
+ */
+public class RouteResult {
--- End diff --

Maybe add reference to 
https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/RouteResult.java


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190244366
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
+ *
+ * Router that doesn't contain information about HTTP request methods 
and route
+ * matching orders.
+ */
+final class MethodlessRouter {
+   private static final Logger log = 
LoggerFactory.getLogger(MethodlessRouter.class);
+
+   // A path pattern can only point to one target
+   private final Map<PathPattern, T> routes = new LinkedHashMap<>();
--- End diff --

Please correct me if I'm wrong but I have a question regarding memory 
visibility: The thread that updates this map and the Netty event loop threads 
are different, so there might theoretically be a memory visibility issue if 
routes are added after the router has been passed to the `RouterHandler`. I 
don't think that we do this currently, but the API theoretically allows it. I'm 
wondering whether it makes sense to make the routes immutable, maybe something 
like creating the routes with a builder:
```java
Routes routes = new RoutesBuilder()
  .addGet(...)
  ...
  .build();
Router router = new Router(routes);
```
Or use a thread-safe map here that also preserves ordering (maybe wrap 
using `synchronizedMap`).

---
Since we currently rely on correct registration order (e.g. 
`/jobs/overview` before `/jobs/:id` for correct matching), the immutable 
approach would allow us to include a utility in `Routes` that sorts pattern as 
done in `RestServerEndpoint:143`:
```java
Collections.sort(handlers,RestHandlerUrlComparator.INSTANCE);
```


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190191066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class replaces the standard error response to be identical with 
those sent by the {@link AbstractRestHandler}.
+ */
+public class RouterHandler extends 
SimpleChannelInboundHandler {
--- End diff --

I've verified that this class merges the behaviour of `Handler` and 
`AbstractHandler`. 👍 

Since we copied the code, we might leave it as is, but I noticed the 
following minor things (there are similar warnings in the other copied classes):
- `L46`, `L47`: fields can be private
- `L62`: `throws Exception` can be removed
- `L98`: I get a unchecked call warning for `RouteResult`. We could use 
`` to get rid of it I think


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190246117
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 ---
@@ -89,19 +91,20 @@ public RuntimeMonitorHandler(
}
 
@Override
-   protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, JobManagerGateway jobManagerGateway) {
+   protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest 
routedRequest, JobManagerGateway jobManagerGateway) {
CompletableFuture responseFuture;
+   RouteResult result = routedRequest.getRouteResult();
--- End diff --

I think if you do `RouteResult result = routedRequest.getRouteResult();` 
you don't need the cast to `Set` in lines 101 and 106.


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190248637
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
Compared to original version this one
--- End diff --

- Maybe add a link to 
https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/Router.java


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190213387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
+ *
+ * Router that doesn't contain information about HTTP request methods 
and route
+ * matching orders.
+ */
+final class MethodlessRouter {
+   private static final Logger log = 
LoggerFactory.getLogger(MethodlessRouter.class);
+
+   // A path pattern can only point to one target
+   private final Map<PathPattern, T> routes = new LinkedHashMap<>();
--- End diff --

Using the `LinkedHashMap` is what preserves the order of adding handlers, 
right? Or is there another thing that I've missed?


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190248244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
+ *
+ * The pattern can contain constants or placeholders, example:
+ * {@code constant1/:placeholder1/constant2/:*}.
+ *
+ * {@code :*} is a special placeholder to catch the rest of the path
+ * (may include slashes). If exists, it must appear at the end of the path.
+ *
+ * The pattern must not contain query, example:
+ * {@code constant1/constant2?foo=bar}.
+ *
+ * The pattern will be broken to tokens, example:
+ * {@code ["constant1", ":variable", "constant2", ":*"]}
+ */
+final class PathPattern {
--- End diff --

Maybe add reference to 
https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/PathPattern.java


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190186599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class replaces the standard error response to be identical with 
those sent by the {@link AbstractRestHandler}.
--- End diff --

- I would add a comment that this was copied and simplified from 
https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/Handler.java
 and 
https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/AbstractHandler.java
 for future reference. That can be beneficial in the future.
- I would also copy the high-level comment from that class: `Inbound 
handler that converts HttpRequest to RoutedRequest and passes RoutedRequest to 
the matched handler` as found in the original Handler class.


---


[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router

2018-05-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/6031#discussion_r190197601
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. 
For more information check {@link Router}.
--- End diff --

- Maybe add a link to 
https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/MethodlessRouter.java
 and 
https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/OrderlessRouter.java
 as a reference


---


[GitHub] flink pull request #:

2018-03-27 Thread uce
Github user uce commented on the pull request:


https://github.com/apache/flink/commit/3a61ea47922280e15f462ca3cdc0c367047bde24#commitcomment-28293221
  
@twalthr I think this broke the build. At least locally, I get a RAT 
license failure. 

```
1 Unknown Licenses

*

Files with unapproved licenses:

  docs/page/js/jquery.min.js

*
```


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-09 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
Didn't merge yet, because there is an issue with the buildbot environment.


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-06 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
I've built this locally and everything looks good to me (linebreaks and 
code highlighting).

I will merge this to `master`, but keep this PR open for a while. If 
everything works in the `buildbot` environment I will also merge it to 1.4. 
Then we can close this PR. :-)


---


[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...

2018-02-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4809#discussion_r166008336
  
--- Diff: docs/ops/state/savepoints.md ---
@@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job 
with ID `:jobid` and cancel
 
 If you don't specify a target directory, you need to have [configured a 
default directory](#configuration). Otherwise, cancelling the job with a 
savepoint will fail.
 
+
--- End diff --

👍 


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-02 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
https://issues.apache.org/jira/browse/INFRA-15959

`ruby2.3.1` has been installed on the buildbot. I will check whether the 
build works as expected and report back here (probably only by next week).



---


[GitHub] flink pull request #5395: [FLINK-8308] Remove explicit yajl-ruby dependency,...

2018-02-01 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/5395#discussion_r165295535
  
--- Diff: docs/_config.yml ---
@@ -77,12 +77,7 @@ defaults:
   layout: plain
   nav-pos: 9 # Move to end if no pos specified
 
-markdown: KramdownPygments
--- End diff --

Some questions regarding the removed lines:
- What is used for markdown and code highlighting now?
- Do we still support GitHub-flavored Markdown?
- What happens to new lines?

I didn't have time to build the docs again to check yet.


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-01-31 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  

https://ci.apache.org/builders/flink-docs-master/builds/977/steps/Flink%20docs/logs/stdio
 says
```
Ruby version:
ruby 2.0.0p384 (2014-01-12) [x86_64-linux-gnu]
```

I can ask INFRA whether it is possible to run a more recent Ruby version.



---


[GitHub] flink issue #5331: [FLINK-8473][webUI] Improve error behavior of JarListHand...

2018-01-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5331
  
I just tried it, re-uploading after deleting the directory **does not 
work**. Good catch Stephan. :-)

@zentol: I found `HttpRequestHandler` which handles the uploads. The 
handler assumes that the directory exists and the logic for creating the temp 
directory is in `WebRuntimeMonitor`. We should consolidate this in a single 
place (if no one else needs this directory).



---


[GitHub] flink issue #5331: [FLINK-8473][webUI] Improve error behavior of JarListHand...

2018-01-22 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5331
  
👍  to merge.


---


[GitHub] flink pull request #5331: [FLINK-8473][webUI] Improve error behavior of JarL...

2018-01-22 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/5331#discussion_r162935682
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 ---
@@ -77,6 +82,11 @@ public boolean accept(File dir, String name) {
}
});
 
+   if (list == null) {
+   LOG.warn("Jar storage directory 
{} has been deleted.", jarDir.getAbsolutePath());
--- End diff --

Somehow this comment got lost before: Should we make the error message more 
explicit and say that it was `deleted externally` (e.g. not by Flink)?


---


[GitHub] flink pull request #5331: [FLINK-8473][webUI] Improve error behavior of JarL...

2018-01-22 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/5331#discussion_r162932050
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 ---
@@ -145,6 +155,7 @@ public boolean accept(File dir, String name) {
return writer.toString();
}
catch (Exception e) {
+   LOG.warn("Failed to fetch jar list.", 
e);
--- End diff --

Aren't failed completions logged anyways?


---


[GitHub] flink issue #5264: [FLINK-8352][web-dashboard] Flink UI Reports No Error on ...

2018-01-09 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5264
  
Build failure is unrelated (YARN test). Merging this. Thanks!



---


[GitHub] flink issue #5264: [FLINK-8352][web-dashboard] Flink UI Reports No Error on ...

2018-01-08 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5264
  
Looks good to me. I've tried this out locally for both a working and not 
working (see screenshot) JAR.

![screen shot 2018-01-08 at 18 05 
58](https://user-images.githubusercontent.com/1756620/34682298-cad92672-f49e-11e7-9d55-486a53d5d3ad.png)

I will merge this as soon as Travis gives a green light. 


---


[GitHub] flink issue #5263: [FLINK-7991][docs] Fix baseUrl for master branch

2018-01-08 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5263
  
Looks good to me. Good catch! +1 to merge 👍  


---


[GitHub] flink pull request #:

2017-12-10 Thread uce
Github user uce commented on the pull request:


https://github.com/apache/flink/commit/8086e3bee8be4614359041c14786140edff19666#commitcomment-26179876
  
I know that the project historically did not consider the REST API as a 
public API, but I would vote to note down all of these breaking REST API 
changes for the upcoming 1.5 release notes in order to have a good migration 
path for users. I just ran into this when pointing a tool that was using the 
`jobsoverview` endpoint of 1.4 to the latest master and had to look into what 
happened when I got a 404.

It might also be worth to add redirects in 1.5. and only remove them in the 
release after that (cc @zentol).


---


[GitHub] flink issue #5102: [FLINK-7762, FLINK-8167] Clean up and harden WikipediaEdi...

2017-11-29 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5102
  
I would be in favour of removing this or moving it to Bahir, but it is 
currently used in a [doc 
example](https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/run_example_quickstart.html).
 Besides that I doubt that this is of much value to users.

If you think it's OK with it, let's merge this for now and think about the 
example thing really needs it.


---


[GitHub] flink pull request #5102: [FLINK-7762, FLINK-8167] Clean up and harden Wikip...

2017-11-29 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/5102

[FLINK-7762, FLINK-8167] Clean up and harden WikipediaEditsSource

## What is the purpose of the change

This pull requests addresses two related issues with the 
WikipediaEditsSource. It makes the WikipediaEditsSourceTest a proper test 
instead of unnecessarily starting a FlinkMiniCluster and addresses a potential 
test instability.

In general, the WikipediaEditsSource is not in good shape and could benefit 
from further refactoring. One potential area for improvement is integration 
with the asynchronous channel listener that reports events like errors or being 
kicked out of a channel, etc.

I did not do this due to time constraints and the fact that this is not a 
production source. In general, it is questionable whether we should keep the 
test as is or remove it since it depends on connectivity to an IRC channel.

## Brief change log
- Harden WikipediaEditsSource with eager sanity checks
- Make WikipediaEditsSourceTest proper test

## Verifying this change

This change is a rework/code cleanup without any new test coverage.

## Does this pull request potentially affect one of the following parts:

- Dependencies (does it add or upgrade a dependency): yes, but only to 
`flink-test-utils-junit`
- The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no

## Documentation

- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 7762-8167-wikiedits

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5102


commit b2ab66f05ce545214a8132dc2d46b3143939b015
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-11-29T15:28:18Z

[FLINK-8167] [connector-wikiedits] Harden WikipediaEditsSource

- Minor eager sanity checks
- Use UUID suffix for nickname. As reported in FLINK-8167, the current
  nickname suffix can result in nickname clashes which lead to test
  failures.

commit 06ec1542963bbe2afaf1ad1fd55a54d13f855304
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-11-29T15:36:29Z

[FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test

The WikipediaEditsSourceTest unnecessarily implements an integration
test that starts a FlinkMiniCluster and executes a small Flink program.

This simply creates a source and executes run in a separate thread until
a single WikipediaEditEvent is received.




---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r151946320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.ArrayDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Channel handler to read the messages of buffer response or error 
response from the
+ * producer, to write and flush the unannounced credits for the producer.
+ */
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   /** Channels, which already requested partitions from the producers. */
+   private final ConcurrentMap<InputChannelID, RemoteInputChannel> 
inputChannels = new ConcurrentHashMap<>();
+
+   /** Channels, which will notify the producers about unannounced credit. 
*/
+   private final ArrayDeque inputChannelsWithCredit = 
new ArrayDeque<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = 
new ConcurrentHashMap<>();
+
+   private volatile ChannelHandlerContext ctx;
--- End diff --

👍 


---


[GitHub] flink issue #3442: [FLINK-5778] [savepoints] Add savepoint serializer with r...

2017-10-24 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3442
  
I had a quick chat with Stephan about this. @StefanRRichter has an idea how 
to properly implement this. Closing this PR and unassigning the issue.


---


[GitHub] flink pull request #3442: [FLINK-5778] [savepoints] Add savepoint serializer...

2017-10-24 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/3442


---


[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...

2017-10-24 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4888
  
Merged in e8e2913.


---


[GitHub] flink pull request #4888: [backport] [FLINK-7067] Resume checkpointing after...

2017-10-24 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/4888


---


[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...

2017-10-24 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4888
  
All failed tests are due to a similar issue that seems not to be related to 
the tests. Merging...


---


[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...

2017-10-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4888
  
@aljoscha Getting the following message in two of the builds in 
https://travis-ci.org/apache/flink/builds/291523047:
```

==
Compilation/test failure detected, skipping shaded dependency check.

==
```

Ever seen this?


---


[GitHub] flink pull request #4888: [backport] [FLINK-7067] Resume checkpointing after...

2017-10-23 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4888

[backport] [FLINK-7067] Resume checkpointing after failed 
cancel-job-with-savepoint

This is a backport of #4254. I will merge this as soon as Travis gives the 
green light.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 7067-backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4888


commit 9226c3a15f8037851110fbdecf775cad99da771f
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-04T14:39:02Z

[hotfix] [tests] Reduce visibility of helper class methods

There is no need to make the helper methods public. No other class
should even use this inner test helper invokable.

commit c571929ce476f17d02ee22df0b5170b0eb322c2d
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-04T15:01:32Z

[FLINK-7067] [jobmanager] Resume periodic checkpoints after failed 
cancel-job-with-savepoint

Problem: If a cancel-job-with-savepoint request fails, this has an
unintended side effect on the respective job if it has periodic
checkpoints enabled. The periodic checkpoint scheduler is stopped
before triggering the savepoint, but not restarted if a savepoint
fails and the job is not cancelled.

This commit makes sure that the periodic checkpoint scheduler is
restarted iff periodic checkpoints were enabled before.

This closes #4254.

commit 074630a2fbd6dbdc7ff775ee9fb5d46c088dbc6d
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-10-23T12:42:46Z

[FLINK-7067] [jobmanager] Backport to 1.3




---


[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

2017-10-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4254
  
Travis gave the green light, merging this now.


---


[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

2017-10-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4254
  
Cool! I'll rebase this and merge after Travis gives the green light.


---


[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

2017-10-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4254
  
@tillrohrmann Thanks for looking over this. The `TestingCluster` is 
definitely preferable. I don't recall how I ended up with the custom setup 
instead of the `TestingCluster`.

I changed the test to wait for another checkpoint after the failed 
savepoint. I also considered this for the initial PR, but went with mocking in 
order to test the case that periodic checkpoints were not activated before the 
cancellation [1]. I think the current variant is a good compromise between 
completeness and simplicity though.

[1] As seen in the diff of `JobManager.scala`, we only activate the 
periodic scheduler after a failed cancellation iff it was activated before 
cancellation. This case can't be tested robustly with the current approach. We 
could wait for some time and if no checkpoint arrives in that time consider 
checkpoints as not accidentally activated, but that's not robust. I would 
therefore ignore this case if you don't have another idea.



---


[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

2017-10-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4254#discussion_r145923787
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws 
Exception {
}
 
/**
+* Tests that a failed cancel-job-with-savepoint request does not 
accidentally disable
+* periodic checkpoints.
+*/
+   @Test
+   public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
+   testCancelJobWithSavepointFailure(true);
+   }
+
+   /**
+* Tests that a failed cancel-job-with-savepoint request does not 
accidentally enable
+* periodic checkpoints.
+*/
+   @Test
+   public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() 
throws Exception {
+   testCancelJobWithSavepointFailure(false);
+   }
+
+   /**
+* Tests that a failed savepoint does not cancel the job and that there 
are no
+* unintended side effects.
+*
+* @param enablePeriodicCheckpoints Flag to indicate whether to enable 
periodic checkpoints. We
+* need to test both here in order to verify that we don't accidentally 
disable or enable
+* checkpoints after a failed cancel-job-with-savepoint request.
+*/
+   private void testCancelJobWithSavepointFailure(
+   boolean enablePeriodicCheckpoints) throws Exception {
+
+   long checkpointInterval = enablePeriodicCheckpoints ? 360 : 
Long.MAX_VALUE;
+
+   // Savepoint target
+   File savepointTarget = tmpFolder.newFolder();
+   savepointTarget.deleteOnExit();
+
+   // Timeout for Akka messages
+   FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+   // A source that declines savepoints, simulating the behaviour
+   // of a failed savepoint.
+   JobVertex sourceVertex = new JobVertex("Source");
+   
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+   sourceVertex.setParallelism(1);
+   JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+   final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
+
+   try {
+   Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+   new Configuration(),
+   actorSystem,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   highAvailabilityServices,
+   Option.apply("jm"),
+   Option.apply("arch"),
+   TestingJobManager.class,
+   TestingMemoryArchivist.class);
+
+   UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
+   
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+   TestingUtils.TESTING_TIMEOUT());
+
+   ActorGateway jobManager = new 
AkkaActorGateway(master._1(), leaderId);
+
+   ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+   new Configuration(),
+   ResourceID.generate(),
+   actorSystem,
+   highAvailabilityServices,
+   "localhost",
+   Option.apply("tm"),
+   true,
+   TestingTaskManager.class);
+
+   ActorGateway taskManager = new 
AkkaActorGateway(taskManagerRef, leaderId);
--- End diff --

Definitely +1


---


[GitHub] flink issue #4693: [FLINK-7645][docs] Modify system-metrics part show in the...

2017-09-25 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4693
  
Hey @yew1eb, I like this 👍  I think it really makes sense to make this 
browsable from the table of contents. @zentol What do you think? I would be in 
favour of merging this if you don't have any objections.


---


[GitHub] flink pull request #4645: [FLINK-7582] [REST] Netty thread immediately parse...

2017-09-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4645#discussion_r137269070
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
}
 
private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, Class responseClass) {
-   return CompletableFuture.supplyAsync(() -> 
bootstrap.connect(targetAddress, targetPort), executor)
-   .thenApply((channel) -> {
-   try {
-   return channel.sync();
-   } catch (InterruptedException e) {
-   throw new FlinkRuntimeException(e);
-   }
-   })
-   .thenApply((ChannelFuture::channel))
-   .thenCompose(channel -> {
-   ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future = 
handler.getJsonFuture();
-   channel.writeAndFlush(httpRequest);
-   return future;
-   }).thenComposeAsync(
-   (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
-   executor
-   );
+   ChannelFuture connect = bootstrap.connect(targetAddress, 
targetPort);
+   Channel channel;
--- End diff --

I understand, but it can be confusing that a method which returns a future 
actually has a (potentially long) blocking operation.


---


[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...

2017-09-06 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4647
  
Thanks @zentol for the review.

@jameslafa 
- Regarding the first point with checkstyle: It's confusing that our 
checkstyle settings don't catch this, but Chesnay is right here. Seems nitpicky 
but we try to avoid unnecessary formatting changes.
- Regarding the changes to the *.js files: Since you didn't change any of 
the coffee scripts, there should be no need to commit those files and I would 
also suggest to remove those changes. The changes are probably due to different 
versions on your machine and the previous contributor who changed the files. I 
think this only reinforces the argument we had about committing the *.lock file 
too. Could you create a new JIRA for this?

@zentol 
- I didn't understand your follow up comments regarding the 
`metricsFetched` flag. Could you please elaborate on what you mean? Is the flag 
ok to keep after #4472 is merged?




---


[GitHub] flink pull request #4645: [FLINK-7582] [REST] Netty thread immediately parse...

2017-09-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4645#discussion_r137038459
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
}
 
private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, Class responseClass) {
-   return CompletableFuture.supplyAsync(() -> 
bootstrap.connect(targetAddress, targetPort), executor)
-   .thenApply((channel) -> {
-   try {
-   return channel.sync();
-   } catch (InterruptedException e) {
-   throw new FlinkRuntimeException(e);
-   }
-   })
-   .thenApply((ChannelFuture::channel))
-   .thenCompose(channel -> {
-   ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future = 
handler.getJsonFuture();
-   channel.writeAndFlush(httpRequest);
-   return future;
-   }).thenComposeAsync(
-   (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
-   executor
-   );
+   ChannelFuture connect = bootstrap.connect(targetAddress, 
targetPort);
+   Channel channel;
--- End diff --

Couldn't you keep the async connect and only change the last `composeAsync` 
to `compose`? 


---


[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...

2017-08-17 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4553
  
@twalthr Thanks for doing the work of merging this to the other branches as 
well. I've triggered a build for all branches. While 1.2 worked and the warning 
is now available online 
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/), older branches 
don't build properly:

https://ci.apache.org/builders/flink-docs-release-1.0/builds/545
https://ci.apache.org/builders/flink-docs-release-1.1/builds/391


Do you have a clue what we can do there? Also pinging @rmetzger who has 
some experience with the Apache infrastructure...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4553: [FLINK-7642] [docs] Add very obvious warning about...

2017-08-16 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4553

[FLINK-7642] [docs] Add very obvious warning about outdated docs

## What is the purpose of the change

This pull requests make the warning about outdated docs more obvious.

![screen shot 2017-08-16 at 18 34 
03](https://user-images.githubusercontent.com/1756620/29374684-93ac1abe-82b2-11e7-9b9b-5008ac33a8e5.png)

Please compare the screenshot to our current state: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/

If you like this, I would back/forward port this to all versions >= 1.0 and 
update the releasing Wiki page to add a note about updating the configuration 
of the docs.


## Brief change log
- Change the color of the outdated warning footer to red
- Rename the config key from `is_latest` to `show_outdated_warning`
- Add an outdated warning to every page before the actual content

## Verifying this change
- We don't have any tests for the docs
- You can manually check out my branch and build the docs 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 7462-outdated_docs_warning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4553


commit e140634687a56fb5fdd0e32a1dce3d5ac41fd123
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-08-16T16:21:49Z

[FLINK-7462] [docs] Make outdated docs footer background light red

commit 15fe71e0402d2e2d931485497338973e12cce9db
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-08-16T16:34:51Z

[FLINK-7462] [docs] Rename is_latest flag to show_outdated_warning

commit 9bdeeae0cb88b59f0dfaee547fbf9b54d125645e
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-08-16T16:35:03Z

[FLINK-7462] [docs] Add outdated warning to content




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4360: [FLINK-7220] [checkpoints] Update RocksDB dependency to 5...

2017-07-28 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4360
  
Thanks for the pointer Greg. I didn't look at the JIRA issue. I like the 
idea of automating this but I would go with a pre-push hook instead of a 
pre-commit hook. It could be annoying during local dev if we force every commit 
to have the format already. I'm not sure whether INFRA allows this or not 
though.

@StefanRRichter Sure, can happen. 😊 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4360: [FLINK-7220] [checkpoints] Update RocksDB dependency to 5...

2017-07-28 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4360
  
@StefanRRichter As mentioned by Greg, we should either squash follow up 
commits like `Stephan's comments` into their parent or tag them similar to the 
main commit with a more descriptive message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4391: [FLINK-7258] [network] Fix watermark configuration order

2017-07-25 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4391
  
Thanks for the review @zhijiangW and @zentol. Merging this for `1.3` and 
`master`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4391: [FLINK-7258] [network] Fix watermark configuration...

2017-07-24 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4391

[FLINK-7258] [network] Fix watermark configuration order

## Purpose
This PR changes the order in which low and high watermarks are configured 
for Netty server child connections (high first). That way we avoid running into 
an `IllegalArgumentException` when the low watermark is larger than the high 
watermark (relevant if the configured memory segment size is larger than the 
default).

This situation surfaced only as a logged warning and the low watermark 
configuration was ignored.

## Changelog
- Configure high watermark before low watermark in `NettyServer`
- Configure high watermark before low watermark in `KvStateServer`

## Verifying this change
- The change is pretty trivial with an extended 
`NettyServerLowAndHighWatermarkTest` that now checks the expected watermarks.
- I didn't add a test for `KvStateServer`, because the watermarks can't be 
configured there manually.
- To verify, you can run `NettyServerLowAndHighWatermarkTest` with logging 
before and after this change and verify that no warning is logged anymore.

## Does this pull request potentially affect one of the following parts?
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 7258-watermark_config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4391


commit 73998ba1328d4bf61ee979ed327b0a684ed03aa7
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-24T16:47:23Z

[FLINK-7258] [network] Fix watermark configuration order

When configuring larger memory segment sizes, configuring the
low watermark before the high watermark may lead to an
IllegalArgumentException, because the low watermark will
temporarily be higher than the high watermark. It's necessary
to configure the high watermark before the low watermark.

For the queryable state server in KvStateServer I didn't
add an extra test as the watermarks cannot be configured there.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4112: [FLINK-6901] Flip checkstyle configuration files

2017-07-10 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4112#discussion_r126464199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
 ---
@@ -61,7 +60,9 @@ public NettyBufferPool(int numberOfArenas) {
checkArgument(numberOfArenas >= 1, "Number of arenas");
this.numberOfArenas = numberOfArenas;
 
-   if (!PlatformDependent.hasUnsafe()) {
+   try {
+   Class.forName("sun.misc.Unsafe");
+   } catch (ClassNotFoundException e) {
--- End diff --

Is ok with me to remove this check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

2017-07-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4254#discussion_r125606557
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws 
Exception {
}
 
/**
+* Tests that a failed cancel-job-with-savepoint request does not 
accidentally disable
+* periodic checkpoints.
+*/
+   @Test
+   public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
+   testCancelJobWithSavepointFailure(true);
+   }
+
+   /**
+* Tests that a failed cancel-job-with-savepoint request does not 
accidentally enable
+* periodic checkpoints.
+*/
+   @Test
+   public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() 
throws Exception {
+   testCancelJobWithSavepointFailure(false);
+   }
+
+   /**
+* Tests that a failed savepoint does not cancel the job and that there 
are no
+* unintended side effects.
+*
+* @param enablePeriodicCheckpoints Flag to indicate whether to enable 
periodic checkpoints. We
+* need to test both here in order to verify that we don't accidentally 
disable or enable
+* checkpoints after a failed cancel-job-with-savepoint request.
+*/
+   private void testCancelJobWithSavepointFailure(
+   boolean enablePeriodicCheckpoints) throws Exception {
+
+   long checkpointInterval = enablePeriodicCheckpoints ? 360 : 
Long.MAX_VALUE;
+
+   // Savepoint target
+   File savepointTarget = tmpFolder.newFolder();
+   savepointTarget.deleteOnExit();
+
+   // Timeout for Akka messages
+   FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+   // A source that declines savepoints, simulating the behaviour
+   // of a failed savepoint.
+   JobVertex sourceVertex = new JobVertex("Source");
+   
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+   sourceVertex.setParallelism(1);
+   JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+   final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
+
+   try {
+   Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+   new Configuration(),
+   actorSystem,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   highAvailabilityServices,
+   Option.apply("jm"),
+   Option.apply("arch"),
+   TestingJobManager.class,
+   TestingMemoryArchivist.class);
+
+   UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
+   
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+   TestingUtils.TESTING_TIMEOUT());
+
+   ActorGateway jobManager = new 
AkkaActorGateway(master._1(), leaderId);
+
+   ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+   new Configuration(),
+   ResourceID.generate(),
+   actorSystem,
+   highAvailabilityServices,
+   "localhost",
+   Option.apply("tm"),
+   true,
+   TestingTaskManager.class);
+
+   ActorGateway taskManager = new 
AkkaActorGateway(taskManagerRef, leaderId);
+
+   // Wait until connected
+   Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+   Await.ready(taskManager.ask(msg, askTimeout), 
askTimeout);
+
+   JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
+   Collections.singletonList(sourceVertex.getID()),
+   Collections.singletonList(sourceVertex.getID()),
+   Collections.singletonList(sourceVertex.getID()),
+   checkpointInterval,
+   360,
+   0,
+   Integer.MAX_VALUE,
 

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

2017-07-04 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4254

[FLINK-7067] [jobmanager] Fix side effects after failed 
cancel-job-with-savepoint

If a cancel-job-with-savepoint request fails, this has an unintended side 
effect on the respective job if it has periodic checkpoints enabled. The 
periodic checkpoint scheduler is stopped before triggering the savepoint, but 
not restarted if a savepoint fails and the job is not cancelled.

This fix makes sure that the periodic checkpoint scheduler is restarted iff 
periodic checkpoints were enabled before.

I have the test in a separate commit, because it uses Reflection to update 
a private field with a spied upon instance of the CheckpointCoordinator in 
order to test the expected behaviour. This is super fragile and ugly, but the 
alternatives require a large refactoring (use factories that can be set during 
tests) or don't test this corner case behaviour. The separate commit makes it 
easier to remove/revert it at a future point in time.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 7067-restart_checkpoint_scheduler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4254


commit 7294de0ef77a346b7b38d4b3fcdc421f7fd6855b
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-04T14:39:02Z

[tests] Reduce visibility of helper class methods

There is no need to make the helper methods public. No other class
should even use this inner test helper invokable.

commit ce924bc146d3cf97e0c5ddcc1ba16610b2fc8d49
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-04T14:53:54Z

[FLINK-7067] [jobmanager] Add test for cancel-job-with-savepoint side 
effects

I have this test in a separate commit, because it uses Reflection
to update private field with a spied upon instance of the
CheckpointCoordinator in order to test the expected behaviour. This
makes it easier to remove/revert at a future point in time.

This is super fragile and ugly, but the alternatives require a
large refactoring (use factories that can be set during tests)
or don't test this corner case behaviour.

commit 94aa444cbd7099d7830e06efe3525a717becb740
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-07-04T15:01:32Z

[FLINK-7067] [jobmanager] Fix side effects after failed 
cancel-job-with-savepoint

Problem: If a cancel-job-with-savepoint request fails, this has an
unintended side effect on the respective job if it has periodic
checkpoints enabled. The periodic checkpoint scheduler is stopped
before triggering the savepoint, but not restarted if a savepoint
fails and the job is not cancelled.

This commit makes sure that the periodic checkpoint scheduler is
restarted iff periodic checkpoints were enabled before.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4213: [FLINK-7032] Overwrite inherited properties from parent p...

2017-07-04 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4213
  
Just ran into this as well. This was so annoying. Thank you very much for 
the fix... I spent an hour assuming it was a problem on my side. Merging now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4163: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...

2017-06-22 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/4163


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4164: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...

2017-06-22 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/4164


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4164: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...

2017-06-22 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4164

[FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix 
version (1.2)

Backport of #4162 for the `release-1.2` branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 6952-javadocs-release-1.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4164.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4164






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4163: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...

2017-06-22 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4163

[FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix 
version (1.3)

Backport of #4162 for the `release-1.3` branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 6952-javadocs-release-1.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4163.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4163






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4162: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...

2017-06-22 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/4162

[FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix 
version

Adds a link to the Javadocs in the left side navigation:

![screen shot 2017-06-20 at 11 46 
38](https://user-images.githubusercontent.com/1756620/27427607-e200a3f4-573f-11e7-84ed-51b610c79fe1.png)

Makes sure that the title and other places that reference the Flink release 
version, only point to the minor release version (e.g. `1.3`). For snapshot 
release, we display the full version.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 6952-javadocs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4162


commit 15752faf7059fd580d5da9da024a82e07209b879
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-06-20T10:19:07Z

[FLINK-6952] [docs] Add link to Javadocs

commit 151c13c9c33282493cb0d6ec6266f316e615ac26
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-06-22T09:37:54Z

[FLINK-6985] [docs] Remove bugfix version from title

Removes the bugfix version from title and other places where it
can potentially be confusing.

For snapshot release, version and version_short should be the
same, e.g. 1.4-SNAPSHOT. For stable releases, version should be
the full version string (e.g. 1.2.1) whereas version_short
should skip the bugfix version (e.g. 1.2):.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3495: [FLINK-5781][config] Generation HTML from ConfigOption

2017-05-03 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3495
  
I think it's up to you guys whether you want the short description in there 
or not. I see that it only makes sense if you actually invest the time to have 
a good short description. You would probably need a table like this then:

```
+-+-++
| key | default | short description  |
+-+-++
| long description with details (expandable) |
++
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3785: [FLINK-6337][network] Remove the buffer provider from Par...

2017-05-02 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3785
  
Thanks! Looks good to merge now 👍 I've rebased it and pushed it to 
Travis here: https://travis-ci.org/uce/flink/builds/227891373

As soon as it succeeds, I'm going to merge this PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...

2017-04-28 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3785#discussion_r113904383
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -68,7 +68,11 @@ public static void shutdown() {
 
@Override
ResultSubpartition createSubpartition() {
-   return new SpillableSubpartition(0, 
mock(ResultPartition.class), ioManager);
+   ResultPartition parent = mock(ResultPartition.class);
--- End diff --

Could you make this a static helper method?
```java
private static ResultPartition createMockPartition() {
   ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...

2017-04-28 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3785#discussion_r113905412
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

try {
runKMeans(cluster.getLeaderRPCPort());
-   fail("This program execution should have 
failed.");
--- End diff --

I think that the idea of the test is to check that another program can be 
executed after one failed. The commit message introducing the test says "This 
test validates that task slots in co-location constraints are properly freed in 
the presence of failures." By removing this line, we are not testing what we 
actually wanted to test anymore. We should keep the line but instead decrease 
the number of configured buffers, for example 640 works instead of 840. That 
way we keep the behaviour: successful job, failed job, successful job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3796: [FLINK-6293] [tests] Harden JobManagerITCase

2017-04-28 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3796
  
Good fix. +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...

2017-04-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3785#discussion_r113690389
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

try {
runKMeans(cluster.getLeaderRPCPort());
-   fail("This program execution should have 
failed.");
--- End diff --

Why did you remove this line? I think it's important to keep this line here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...

2017-04-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3785#discussion_r113690239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -351,15 +351,15 @@ public void destroyBufferPool() {
/**
 * Returns the requested subpartition.
 */
-   public ResultSubpartitionView createSubpartitionView(int index, 
BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) 
throws IOException {
+   public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
int refCnt = pendingReferences.get();
 
checkState(refCnt != -1, "Partition released.");
checkState(refCnt > 0, "Partition not pinned.");
 
checkElementIndex(index, subpartitions.length, "Subpartition 
not found.");
 
-   ResultSubpartitionView readView = 
subpartitions[index].createReadView(bufferProvider, availabilityListener);
+   ResultSubpartitionView readView = 
subpartitions[index].createReadView(bufferPool, availabilityListener);
--- End diff --

I think we can completely remove the buffer provider from the 
`createReadView` method:

- In `SpillableSubpartition#createReadView` we can use the segment size of 
the buffer pool of the spillable subpartition itself 
(`parent.getBufferProvider().getMemorySegmentSize()`).
- In `PipelinedSubpartition#createReadView` we don't use the argument 
anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3606: [FLINK-6182] Fix possible NPE in SourceStreamTask

2017-03-24 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3606

[FLINK-6182] Fix possible NPE in SourceStreamTask

A user ran into this and reported confusing NPEs in the logs. This could 
only happen if a source task is cancelled before it was invoked.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 6182-npe_source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3606.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3606


commit b61ee091e843571e546146095842a0bf049a8910
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-24T11:24:19Z

[FLINK-6182] Fix possible NPE in SourceStreamTask




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3597: [FLINK-6170] [metrics] Don't rely on stats snapshot for c...

2017-03-22 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3597
  
Thanks, will merge after Travis gives a green light.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3597: [FLINK-6170] [metrics] Don't rely on stats snapsho...

2017-03-22 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3597

[FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics

Some checkpoint metrics use the latest stats snapshot to get the returned 
metric value. In practice, this means that these metrics are only updated when 
users are browsing the web UI (only then is a new snapshot created).

Instead of relying on the latest snapshot, the checkpoint metrics for the 
latest completed checkpoint should be directly updated when a checkpoint 
completes.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 6170-checkpoint_metrics_update

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3597






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp

2017-03-22 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3583
  
Thanks for addressing the comments. I'm unsure about the `ErrorInfo` 
instead of adding the suggested method. Since @tillrohrmann and @StephanEwen 
often work on the `ExecutionGraph` I would like their input on this.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3577: [FLINK-6133] fix build status in README.md

2017-03-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3577
  
There is a discussion on the dev mailing list whether we want to have this 
or not. I would wait until that is resolved before continuing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation check

2017-03-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3572
  
Thanks for catching the unfortunate typo. I've fixed all of those. I left 
the confusing deprecation comment as is in the table API.

I think this is good to merge then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106949177
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the job 
archives that are located at
+ * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The 
directories are polled in regular intervals, defined
+ * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ * 
+ * The archives are downloaded and expanded into a file structure analog 
to the REST API defined in the WebRuntimeMonitor.
+ */
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;

[GitHub] flink pull request #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation c...

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3572#discussion_r106942215
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
 ---
@@ -114,15 +114,21 @@ public FlinkAggregateExpandDistinctAggregatesRule(
this.useGroupingSets = useGroupingSets;
}
 
-   @Deprecated // to be removed before 2.0
+   /**
+* @deprecated to be removed before 2.0
--- End diff --

I don't know, this is independent imo as it's just copying the previous 
comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106948899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.history;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility class for writing an archive file to a {@link FileSystem} and 
reading it back.
+ */
+public class FsJobArchivist {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FsJobArchivist.class);
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private static final String ARCHIVE = "archive";
+   private static final String PATH = "path";
+   private static final String JSON = "json";
+
+   private FsJobArchivist() {
+   }
+
+   /**
+* Writes the given {@link AccessExecutionGraph} to the {@link 
FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}.
+*
+* @param rootPath directory to which the archive should be written to
+* @param graph  graph to archive
+* @return path to where the archive was written, or null if no archive 
was created
+* @throws IOException
+*/
+   public static Path archiveJob(Path rootPath, AccessExecutionGraph 
graph) throws IOException {
+   try {
+   FileSystem fs = rootPath.getFileSystem();
+   Path path = new Path(rootPath, 
graph.getJobID().toString());
+   OutputStream out = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+
+   try (JsonGenerator gen = 
jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+   gen.writeStartObject();
+   gen.writeArrayFieldStart(ARCHIVE);
+   for (JsonArchivist archiver : 
WebMonitorUtils.getJsonArchivists()) {
+   for (ArchivedJson archive : 
archiver.archiveJsonWithPath(graph)) {
+   gen.writeStartObject();
+   gen.writeStringField(PATH, 
archive.getPath());
+   gen.writeStringField(JSON, 
archive.getJson());
+   gen.writeEndObject();
+   }
+   }
+   gen.writeEndArray();
+   gen.writeEndObject();
+   } catch (Exception e) {
+   fs.delete(path, false);
+   throw e;
+   }
+   LOG.info("Job {} has been archived at {}.", 
graph.getJobID(), path);
+  

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106948356
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the job 
archives that are located at
+ * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The 
directories are polled in regular intervals, defined
+ * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ * 
+ * The archives are downloaded and expanded into a file structure analog 
to the REST API defined in the WebRuntimeMonitor.
+ */
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106902557
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to the HistoryServer.
+ */
+@PublicEvolving
+public class HistoryServerOptions {
+
+   /**
+* The interval at which the HistoryServer polls {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives.
+*/
+   public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL =
+   key("historyserver.archive.fs.refresh-interval")
+   .defaultValue(1L);
+
+   /**
+* Comma-separated list of directories which the HistoryServer polls 
for new archives.
+*/
+   public static final ConfigOption HISTORY_SERVER_DIRS =
+   key("historyserver.archive.fs.dirs")
--- End diff --

One last thought here: What do you think about having the same suffix 
`dirs` or `dir` for both `jobmanager.archive.fs` and `historyserver.archive.fs` 
for the sake of consistency? I know that the HS accepts multiple dirs and the 
JM only one, but it might help prevent typos etc. when configuring the history 
server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106900975
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the job 
archives that are located at
+ * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are 
polled in regular intervals, defined
+ * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}.
+ * 
+ * The archives are first copied into a temporary directory in {@link 
HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and
+ * then expanded. The resulting file structure is analog to the REST API 
defined in the WebRuntimeMonitor.
+ */
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106900609
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The HistoryServer provides a WebInterface and REST API to retrieve 
information about finished jobs for which
+ * the JobManager may have already shut down.
+ * 
+ * The HistoryServer regularly checks a set of directories for job 
archives created by the {@link FsJobArchivist} and
+ * caches these in a local directory. See {@link 
HistoryServerArchiveFetcher}.
+ * 
+ * All configuration options are defined in{@link HistoryServerOptions}.
+ * 
+ * The WebInterface only displays the "Completed Jobs" page.
+ * 
+ * The REST API is limited to
+ * 
+ * /config
+ * /joboverview
+ * /jobs/:jobid/*
+ * 
+ * and relies on static files that are served by the {@link 
HistoryServerStaticFileServerHandler}.
+ */
+public class HistoryServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServer.class);
+
+   private final String webAddress;
+   private final int webPort;
+   private final long webRefreshIntervalMillis;
+   private final File webDir;
+
+   private final HistoryServerArchiveFetcher archiveFetcher;
+
+   private final SSLContext serverSSLContext;
+   private WebFrontendBootstrap netty;
+
+   private final Object startupShutdownLock = new Object();
+   private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+   private final Thread shutdownHook;
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+   String configDir = pt.getRequired("configDir");
+
+   LOG.info("Loading configuration from {}", configDir);
+   final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration(configDir);
+
+   // run the history server
+   SecurityUtils.install(new 
SecurityUtils.SecurityConfiguration(flinkConfig));
+
+   try {
+   SecurityUtils.getInstalledContext().runSecured(new 
Callable() {
+   @Override
+   public Integer call() throws Exception {
+   HistoryServer hs = new 
HistoryServer(flinkConfig);
+   hs.run()

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106900349
  
--- Diff: docs/monitoring/rest_api.md ---
@@ -22,36 +22,69 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a monitoring API that can be used to query status and statistics 
of running jobs, as well as recent completed jobs.
+Flink has a monitoring API that can be used to query the status and 
statistics of running jobs, as well as recent completed jobs.
 This monitoring API is used by Flink's own dashboard, but is designed to 
be used also by custom monitoring tools.
 
 The monitoring API is a REST-ful API that accepts HTTP GET requests and 
responds with JSON data.
 
 * This will be replaced by the TOC
 {:toc}
 
-
 ## Overview
 
-The monitoring API is backed by a web server that runs as part of the 
*JobManager*. By default, this server listens at post `8081`, which can be 
configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the 
monitoring API web server and the web dashboard web server are currently the 
same and thus run together at the same port. They respond to different HTTP 
URLs, though.
+
+
+
+The jobManager monitoring API allows you to query the status and 
statistics of running jobs, as well as recent completed jobs.
+
+By default, this server binds to `localhost`` and listens at post `8081`, 
which can be configured in `flink-conf.yaml` via `jobmanager.web.address` and 
`jobmanager.web.port`.
 
 In the case of multiple JobManagers (for high availability), each 
JobManager will run its own instance of the monitoring API, which offers 
information about completed and running job while that JobManager was elected 
the cluster leader.
+
+
+
--- End diff --

I think it's OK to keep it here as more detailed documentation, but I would 
vote to add a custom page which lists the relevant configuration options on a 
single page. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106900686
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The HistoryServer provides a WebInterface and REST API to retrieve 
information about finished jobs for which
+ * the JobManager may have already shut down.
+ * 
+ * The HistoryServer regularly checks a set of directories for job 
archives created by the {@link FsJobArchivist} and
+ * caches these in a local directory. See {@link 
HistoryServerArchiveFetcher}.
+ * 
+ * All configuration options are defined in{@link HistoryServerOptions}.
+ * 
+ * The WebInterface only displays the "Completed Jobs" page.
+ * 
+ * The REST API is limited to
+ * 
+ * /config
+ * /joboverview
+ * /jobs/:jobid/*
+ * 
+ * and relies on static files that are served by the {@link 
HistoryServerStaticFileServerHandler}.
+ */
+public class HistoryServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServer.class);
+
+   private final String webAddress;
+   private final int webPort;
+   private final long webRefreshIntervalMillis;
+   private final File webDir;
+
+   private final HistoryServerArchiveFetcher archiveFetcher;
+
+   private final SSLContext serverSSLContext;
+   private WebFrontendBootstrap netty;
+
+   private final Object startupShutdownLock = new Object();
+   private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+   private final Thread shutdownHook;
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+   String configDir = pt.getRequired("configDir");
+
+   LOG.info("Loading configuration from {}", configDir);
+   final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration(configDir);
+
+   // run the history server
+   SecurityUtils.install(new 
SecurityUtils.SecurityConfiguration(flinkConfig));
+
+   try {
+   SecurityUtils.getInstalledContext().runSecured(new 
Callable() {
+   @Override
+   public Integer call() throws Exception {
+   HistoryServer hs = new 
HistoryServer(flinkConfig);
+   hs.run()

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106901748
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.history;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility class for writing an archive file to a {@link FileSystem} and 
reading it back.
+ */
+public class FsJobArchivist {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FsJobArchivist.class);
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private static final String ARCHIVE = "archive";
+   private static final String PATH = "path";
+   private static final String JSON = "json";
+
+   private FsJobArchivist() {
+   }
+
+   /**
+* Writes the given {@link AccessExecutionGraph} to the {@link 
FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}.
+*
+* @param rootPath directory to which the archive should be written to
+* @param graph  graph to archive
+* @return path to where the archive was written, or null if no archive 
was created
+* @throws IOException
+*/
+   public static Path archiveJob(Path rootPath, AccessExecutionGraph 
graph) throws IOException {
+   try {
+   FileSystem fs = rootPath.getFileSystem();
+   Path path = new Path(rootPath, 
graph.getJobID().toString());
+   OutputStream out = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+
+   try (JsonGenerator gen = 
jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+   gen.writeStartObject();
+   gen.writeArrayFieldStart(ARCHIVE);
+   for (JsonArchivist archiver : 
WebMonitorUtils.getJsonArchivists()) {
+   for (ArchivedJson archive : 
archiver.archiveJsonWithPath(graph)) {
+   gen.writeStartObject();
+   gen.writeStringField(PATH, 
archive.getPath());
+   gen.writeStringField(JSON, 
archive.getJson());
+   gen.writeEndObject();
+   }
+   }
+   gen.writeEndArray();
+   gen.writeEndObject();
+   } catch (Exception e) {
+   fs.delete(path, false);
+   throw e;
+   }
+   LOG.info("Job {} has been archived at {}.", 
graph.getJobID(), path);
+   return path;
+   } catch (IOException e) {
 

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r106903661
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the job 
archives that are located at
+ * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are 
polled in regular intervals, defined
+ * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}.
+ * 
+ * The archives are first copied into a temporary directory in {@link 
HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and
+ * then expanded. The resulting file structure is analog to the REST API 
defined in the WebRuntimeMonitor.
+ */
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 

[GitHub] flink pull request #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation c...

2017-03-20 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3572

[FLINK-6127] [checkstyle] Add MissingDeprecation check

Adds the MissingDeprecation check to our set of checkstyle rules.

Requires every `@Deprecated` annotation to have a `@deprecated` JavaDoc, 
forcing us to have both or none. This also applies for internal classes.

Most of the changes were cosmetic (e.g. we had some comments but were not 
using the `@deprecated` JavaDoc).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink checkstyle_deprecation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3572


commit 06d70219566a43049f07ff3159f556fe70dbcaec
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T10:45:52Z

[FLINK-6127] [checkstyle] Add MissingDeprecation check

commit a1f8b273dc8a662f38ebd92cf092f38f53008d35
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T10:57:24Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-core

commit 31377693c0e2ed95f0a6907059c65e21a6d78edf
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T11:03:42Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-java

commit de238a72a6920d6a34e6d36b5fdb62b23ab0c4ab
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T11:20:38Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-runtime

commit f909ab65f141ad4dcceb48fe616294adda43de53
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T12:46:27Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-streaming-java

commit b111cf665abcbb174d43a694f90570dc90c3da62
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T12:48:27Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in 
flink-statebackend-rocksdb

commit 5532c8bbb02b716a5fd6b6e1d58f712c16ad5b55
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T12:50:52Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-table

commit 653a6f2c7848a0311c3ffbdfc3acc193101001c8
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T12:55:51Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in 
flink-connector-kafka-0.8

commit 83e797d86f1bb8c07cf07b16517eb60bedf355d7
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-03-20T12:57:55Z

[FLINK-6127] [checkstyle] Fix missing @deprecated in flink-yarn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3552: [FLINK-5978] Fix JM WebFrontend address ConfigOption is d...

2017-03-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3552
  
Looking into #2680 that added this change, it was meant to be followed up 
on and move all web configuration to a new class. Could we do this here and not 
only a single one?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3543: [FLINK-5985] [Backport for 1.2] Report no task states for...

2017-03-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3543
  
Could you merge this @StefanRRichter? I think it is one of the last 
blockers for 1.2.1 and it's a pretty critical issue...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3561: [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnv...

2017-03-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3561
  
@aljoscha Could you have a brief look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3494: [FLINK-5635] [docker] Improve Docker tooling

2017-03-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3494
  
Thanks for this :-) It was interesting to follow your discussions @iemejia, 
@jgrier and @patricklucas. I've tried this out and it works as expected. I'm 
going to merge this later today if there are no objections. My understanding is 
that after merging this PR we can continue with #3500. If I misunderstood 
anything, please ping me here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105929511
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map<String, Path> cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105929296
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map<String, Path> cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105928423
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to the HistoryServer.
+ */
+@PublicEvolving
+public class HistoryServerOptions {
+
+   /**
+* The interval at which the HistoryServer polls {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives.
+*/
+   public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL =
+   key("historyserver.refresh-interval")
+   .defaultValue(3000L);
--- End diff --

I was under the impression that the `historyserver.web.refresh-interval` 
does not affect the actual web refresh interval specified in index.coffee, 
right?

What I thought originally was that the fs refresh interval being smaller 
than the web refresh interval does not make sense if we only update the 
frontend every 10s. But I forgot that people also manually browse the pages ;-) 
I'm still in favour to increase as you say.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105906846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
 ---
@@ -32,6 +32,13 @@
public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE =

key("job-manager.max-attempts-history-size").defaultValue(16);
 
+   /**
+* The location where the {@link JobManager} stores the archives for 
finished jobs.
+*/
+   public static final ConfigOption ARCHIVE_DIR =
+   key("jobmanager.archive.dir")
--- End diff --

Should we rename this to be in line with my other proposal to allow future 
changes? E.g. `jobmanager.archive.fs.dir`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105918471
  
--- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade ---
@@ -0,0 +1,60 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+doctype html
+html(lang='en')
+  head
+meta(charset='utf-8')
+meta(http-equiv='X-UA-Compatible', content='IE=edge')
+meta(name='viewport', content='width=device-width, initial-scale=1')
+
+title Apache Flink Web Dashboard
+
+link(rel="apple-touch-icon", sizes="180x180", 
href="images/apple-touch-icon.png")
+link(rel="icon", type="image/png", href="images/favicon-32x32.png", 
sizes="32x32")
+link(rel="icon", type="image/png", href="images/favicon-16x16.png", 
sizes="16x16")
+link(rel="manifest", href="images/manifest.json")
+link(rel="mask-icon", href="images/safari-pinned-tab.svg", 
color="#aa1919")
+link(rel="shortcut icon", href="images/favicon.ico")
+meta(name="msapplication-config", content="images/browserconfig.xml")
+meta(name="theme-color", content="#ff")
+
+link(rel='stylesheet', href='css/vendor.css', type='text/css')
+link(rel='stylesheet', href='css/index.css', type='text/css')
+
+script(src="js/vendor.js")
+script(src="js/hs/index.js")
+
+  body(ng-app="flinkApp" ng-strict-di)
+#sidebar(ng-class="{ 'sidebar-visible': sidebarVisible }")
+  nav.navbar.navbar-inverse.navbar-static-top
+.navbar-header
+  a.navbar-brand(ui-sref="completed-jobs")
+img.logo(alt="Apache Flink Dashboard" 
src="images/flink-logo.png")
+  a.navbar-brand.navbar-brand-text(ui-sref="completed-jobs")
+| Apache Flink Dashboard
--- End diff --

Also rename to `Apache Flink History Server`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105898166
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map<String, Path> cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105894483
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map<String, Path> cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105876698
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HistoryServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServer.class);
+
+   private final String webAddress;
+   private final int webPort;
+   private final long webRefreshIntervalMillis;
+   private final File webDir;
+
+   private final HistoryServerArchiveFetcher archiveFetcher;
+
+   private final SSLContext serverSSLContext;
+   private WebFrontendBootstrap netty;
+
+   private final Object startupShutdownLock = new Object();
+   private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+   private final Thread shutdownHook;
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+   String configDir = pt.getRequired("configDir");
+
+   LOG.info("Loading configuration from {}", configDir);
+   final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration(configDir);
+
+   // run the job manager
+   SecurityUtils.install(new 
SecurityUtils.SecurityConfiguration(flinkConfig));
+
+   try {
+   SecurityUtils.getInstalledContext().runSecured(new 
Callable() {
+   @Override
+   public Integer call() throws Exception {
+   HistoryServer hs = new 
HistoryServer(flinkConfig);
+   hs.run();
+   return 0;
+   }
+   });
+   System.exit(0);
+   } catch (UndeclaredThrowableException ute) {
+   Throwable cause = ute. getUndeclaredThrowable();
+   LOG.error("Failed to run HistoryServer.", cause);
+   cause.printStackTrace();
+   System.exit(1);
+   } catch (Exception e) {
+   LOG.error("Failed to run HistoryServer.", e);
+   e.printStackTrace();
+   System.exit(1);
+   }
+   }
+
+   public HistoryServer(Configuration config) throws IOException, 
FlinkException {
+   if 
(config.getBoolean(HistoryServerOptions.HISTO

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105894735
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map<String, Path> cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105877694
  
--- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade ---
@@ -0,0 +1,60 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+doctype html
+html(lang='en')
+  head
+meta(charset='utf-8')
+meta(http-equiv='X-UA-Compatible', content='IE=edge')
+meta(name='viewport', content='width=device-width, initial-scale=1')
+
+title Apache Flink Web Dashboard
--- End diff --

Should we adjust the title to `Apache Flink History Server`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >