[GitHub] flink issue #6031: [FLINK-9386] Embed netty router
Github user uce commented on the issue: https://github.com/apache/flink/pull/6031 > I planned to upgrade netty and drop netty-router in one step of upgrading flink-shaded-netty. Do you think it should be split somehow? No, if the ticket for upgrading covers that, we are all good. ð to merge. Thanks for taking care of this. Looking forward to finally have a recent Netty version. ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190274800 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. + * + * Router that doesn't contain information about HTTP request methods and route + * matching orders. + */ +final class MethodlessRouter { + private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class); + + // A path pattern can only point to one target + private final Map<PathPattern, T> routes = new LinkedHashMap<>(); --- End diff -- Fine with me to not invest more time into this and keep it as is ð ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190274982 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class replaces the standard error response to be identical with those sent by the {@link AbstractRestHandler}. + */ +public class RouterHandler extends SimpleChannelInboundHandler { + public static final String ROUTER_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTER_HANDLER"; + public static final String ROUTED_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTED_HANDLER"; + + private final Map<String, String> responseHeaders; + private final Router router; + + public RouterHandler(Router router, final Map<String, String> responseHeaders) { + this.router = requireNonNull(router); + this.responseHeaders = requireNonNull(responseHeaders); + } + + public String getName() { + return ROUTER_HANDLER_NAME; + } + + @Override + protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception { + if (HttpHeaders.is100ContinueExpected(httpRequest)) { + channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); + return; + } + + // Route + HttpMethod method = httpRequest.getMethod(); + QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri()); + RouteResult routeResult = router.route(method, qsd.path(), qsd.parameters()); + + if (routeResult == null) { + respondNotFound(channelHandlerContext, httpRequest); + return; + } + + routed(channelHandlerContext, routeResult, httpRequest); + } + + private void routed( + ChannelHandlerContext channelHandlerContext, + RouteResult routeResult, + HttpRequest httpRequest) throws Exception { + ChannelInboundHandler handler = (ChannelInboundHandler) routeResult.target(); + + // The handler may have been added (keep alive) + ChannelPipeline pipeline = channelHandlerContext.pipeline(); + ChannelHandler addedHandler = pipeline.get(ROUTED_HANDLE
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190214988 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class replaces the standard error response to be identical with those sent by the {@link AbstractRestHandler}. + */ +public class RouterHandler extends SimpleChannelInboundHandler { + public static final String ROUTER_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTER_HANDLER"; + public static final String ROUTED_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTED_HANDLER"; + + private final Map<String, String> responseHeaders; + private final Router router; + + public RouterHandler(Router router, final Map<String, String> responseHeaders) { + this.router = requireNonNull(router); + this.responseHeaders = requireNonNull(responseHeaders); + } + + public String getName() { + return ROUTER_HANDLER_NAME; + } + + @Override + protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception { + if (HttpHeaders.is100ContinueExpected(httpRequest)) { + channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); + return; + } + + // Route + HttpMethod method = httpRequest.getMethod(); + QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri()); + RouteResult routeResult = router.route(method, qsd.path(), qsd.parameters()); + + if (routeResult == null) { + respondNotFound(channelHandlerContext, httpRequest); + return; + } + + routed(channelHandlerContext, routeResult, httpRequest); + } + + private void routed( + ChannelHandlerContext channelHandlerContext, + RouteResult routeResult, + HttpRequest httpRequest) throws Exception { + ChannelInboundHandler handler = (ChannelInboundHandler) routeResult.target(); + + // The handler may have been added (keep alive) + ChannelPipeline pipeline = channelHandlerContext.pipeline(); + ChannelHandler addedHandler = pipeline.get(ROUTED_HANDLE
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190248429 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. + * + * Result of calling {@link Router#route(HttpMethod, String)}. + */ +public class RouteResult { --- End diff -- Maybe add reference to https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/RouteResult.java ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190244366 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. + * + * Router that doesn't contain information about HTTP request methods and route + * matching orders. + */ +final class MethodlessRouter { + private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class); + + // A path pattern can only point to one target + private final Map<PathPattern, T> routes = new LinkedHashMap<>(); --- End diff -- Please correct me if I'm wrong but I have a question regarding memory visibility: The thread that updates this map and the Netty event loop threads are different, so there might theoretically be a memory visibility issue if routes are added after the router has been passed to the `RouterHandler`. I don't think that we do this currently, but the API theoretically allows it. I'm wondering whether it makes sense to make the routes immutable, maybe something like creating the routes with a builder: ```java Routes routes = new RoutesBuilder() .addGet(...) ... .build(); Router router = new Router(routes); ``` Or use a thread-safe map here that also preserves ordering (maybe wrap using `synchronizedMap`). --- Since we currently rely on correct registration order (e.g. `/jobs/overview` before `/jobs/:id` for correct matching), the immutable approach would allow us to include a utility in `Routes` that sorts pattern as done in `RestServerEndpoint:143`: ```java Collections.sort(handlers,RestHandlerUrlComparator.INSTANCE); ``` ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190191066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class replaces the standard error response to be identical with those sent by the {@link AbstractRestHandler}. + */ +public class RouterHandler extends SimpleChannelInboundHandler { --- End diff -- I've verified that this class merges the behaviour of `Handler` and `AbstractHandler`. ð Since we copied the code, we might leave it as is, but I noticed the following minor things (there are similar warnings in the other copied classes): - `L46`, `L47`: fields can be private - `L62`: `throws Exception` can be removed - `L98`: I get a unchecked call warning for `RouteResult`. We could use `` to get rid of it I think ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190246117 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java --- @@ -89,19 +91,20 @@ public RuntimeMonitorHandler( } @Override - protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) { + protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, JobManagerGateway jobManagerGateway) { CompletableFuture responseFuture; + RouteResult result = routedRequest.getRouteResult(); --- End diff -- I think if you do `RouteResult result = routedRequest.getRouteResult();` you don't need the cast to `Set` in lines 101 and 106. ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190248637 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. Compared to original version this one --- End diff -- - Maybe add a link to https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/Router.java ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190213387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. + * + * Router that doesn't contain information about HTTP request methods and route + * matching orders. + */ +final class MethodlessRouter { + private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class); + + // A path pattern can only point to one target + private final Map<PathPattern, T> routes = new LinkedHashMap<>(); --- End diff -- Using the `LinkedHashMap` is what preserves the order of adding handlers, right? Or is there another thing that I've missed? ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190248244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. + * + * The pattern can contain constants or placeholders, example: + * {@code constant1/:placeholder1/constant2/:*}. + * + * {@code :*} is a special placeholder to catch the rest of the path + * (may include slashes). If exists, it must appear at the end of the path. + * + * The pattern must not contain query, example: + * {@code constant1/constant2?foo=bar}. + * + * The pattern will be broken to tokens, example: + * {@code ["constant1", ":variable", "constant2", ":*"]} + */ +final class PathPattern { --- End diff -- Maybe add reference to https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/PathPattern.java ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190186599 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class replaces the standard error response to be identical with those sent by the {@link AbstractRestHandler}. --- End diff -- - I would add a comment that this was copied and simplified from https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/Handler.java and https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/AbstractHandler.java for future reference. That can be beneficial in the future. - I would also copy the high-level comment from that class: `Inbound handler that converts HttpRequest to RoutedRequest and passes RoutedRequest to the matched handler` as found in the original Handler class. ---
[GitHub] flink pull request #6031: [FLINK-9386] Embed netty router
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/6031#discussion_r190197601 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.router; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}. --- End diff -- - Maybe add a link to https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/MethodlessRouter.java and https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/OrderlessRouter.java as a reference ---
[GitHub] flink pull request #:
Github user uce commented on the pull request: https://github.com/apache/flink/commit/3a61ea47922280e15f462ca3cdc0c367047bde24#commitcomment-28293221 @twalthr I think this broke the build. At least locally, I get a RAT license failure. ``` 1 Unknown Licenses * Files with unapproved licenses: docs/page/js/jquery.min.js * ``` ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 Didn't merge yet, because there is an issue with the buildbot environment. ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 I've built this locally and everything looks good to me (linebreaks and code highlighting). I will merge this to `master`, but keep this PR open for a while. If everything works in the `buildbot` environment I will also merge it to 1.4. Then we can close this PR. :-) ---
[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4809#discussion_r166008336 --- Diff: docs/ops/state/savepoints.md --- @@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job with ID `:jobid` and cancel If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, cancelling the job with a savepoint will fail. + --- End diff -- ð ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 https://issues.apache.org/jira/browse/INFRA-15959 `ruby2.3.1` has been installed on the buildbot. I will check whether the build works as expected and report back here (probably only by next week). ---
[GitHub] flink pull request #5395: [FLINK-8308] Remove explicit yajl-ruby dependency,...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/5395#discussion_r165295535 --- Diff: docs/_config.yml --- @@ -77,12 +77,7 @@ defaults: layout: plain nav-pos: 9 # Move to end if no pos specified -markdown: KramdownPygments --- End diff -- Some questions regarding the removed lines: - What is used for markdown and code highlighting now? - Do we still support GitHub-flavored Markdown? - What happens to new lines? I didn't have time to build the docs again to check yet. ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 https://ci.apache.org/builders/flink-docs-master/builds/977/steps/Flink%20docs/logs/stdio says ``` Ruby version: ruby 2.0.0p384 (2014-01-12) [x86_64-linux-gnu] ``` I can ask INFRA whether it is possible to run a more recent Ruby version. ---
[GitHub] flink issue #5331: [FLINK-8473][webUI] Improve error behavior of JarListHand...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5331 I just tried it, re-uploading after deleting the directory **does not work**. Good catch Stephan. :-) @zentol: I found `HttpRequestHandler` which handles the uploads. The handler assumes that the directory exists and the logic for creating the temp directory is in `WebRuntimeMonitor`. We should consolidate this in a single place (if no one else needs this directory). ---
[GitHub] flink issue #5331: [FLINK-8473][webUI] Improve error behavior of JarListHand...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5331 ð to merge. ---
[GitHub] flink pull request #5331: [FLINK-8473][webUI] Improve error behavior of JarL...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/5331#discussion_r162935682 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java --- @@ -77,6 +82,11 @@ public boolean accept(File dir, String name) { } }); + if (list == null) { + LOG.warn("Jar storage directory {} has been deleted.", jarDir.getAbsolutePath()); --- End diff -- Somehow this comment got lost before: Should we make the error message more explicit and say that it was `deleted externally` (e.g. not by Flink)? ---
[GitHub] flink pull request #5331: [FLINK-8473][webUI] Improve error behavior of JarL...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/5331#discussion_r162932050 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java --- @@ -145,6 +155,7 @@ public boolean accept(File dir, String name) { return writer.toString(); } catch (Exception e) { + LOG.warn("Failed to fetch jar list.", e); --- End diff -- Aren't failed completions logged anyways? ---
[GitHub] flink issue #5264: [FLINK-8352][web-dashboard] Flink UI Reports No Error on ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5264 Build failure is unrelated (YARN test). Merging this. Thanks! ---
[GitHub] flink issue #5264: [FLINK-8352][web-dashboard] Flink UI Reports No Error on ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5264 Looks good to me. I've tried this out locally for both a working and not working (see screenshot) JAR. ![screen shot 2018-01-08 at 18 05 58](https://user-images.githubusercontent.com/1756620/34682298-cad92672-f49e-11e7-9d55-486a53d5d3ad.png) I will merge this as soon as Travis gives a green light. ---
[GitHub] flink issue #5263: [FLINK-7991][docs] Fix baseUrl for master branch
Github user uce commented on the issue: https://github.com/apache/flink/pull/5263 Looks good to me. Good catch! +1 to merge ð ---
[GitHub] flink pull request #:
Github user uce commented on the pull request: https://github.com/apache/flink/commit/8086e3bee8be4614359041c14786140edff19666#commitcomment-26179876 I know that the project historically did not consider the REST API as a public API, but I would vote to note down all of these breaking REST API changes for the upcoming 1.5 release notes in order to have a good migration path for users. I just ran into this when pointing a tool that was using the `jobsoverview` endpoint of 1.4 to the latest master and had to look into what happened when I got a 404. It might also be worth to add redirects in 1.5. and only remove them in the release after that (cc @zentol). ---
[GitHub] flink issue #5102: [FLINK-7762, FLINK-8167] Clean up and harden WikipediaEdi...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5102 I would be in favour of removing this or moving it to Bahir, but it is currently used in a [doc example](https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/run_example_quickstart.html). Besides that I doubt that this is of much value to users. If you think it's OK with it, let's merge this for now and think about the example thing really needs it. ---
[GitHub] flink pull request #5102: [FLINK-7762, FLINK-8167] Clean up and harden Wikip...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/5102 [FLINK-7762, FLINK-8167] Clean up and harden WikipediaEditsSource ## What is the purpose of the change This pull requests addresses two related issues with the WikipediaEditsSource. It makes the WikipediaEditsSourceTest a proper test instead of unnecessarily starting a FlinkMiniCluster and addresses a potential test instability. In general, the WikipediaEditsSource is not in good shape and could benefit from further refactoring. One potential area for improvement is integration with the asynchronous channel listener that reports events like errors or being kicked out of a channel, etc. I did not do this due to time constraints and the fact that this is not a production source. In general, it is questionable whether we should keep the test as is or remove it since it depends on connectivity to an IRC channel. ## Brief change log - Harden WikipediaEditsSource with eager sanity checks - Make WikipediaEditsSourceTest proper test ## Verifying this change This change is a rework/code cleanup without any new test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, but only to `flink-test-utils-junit` - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 7762-8167-wikiedits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5102.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5102 commit b2ab66f05ce545214a8132dc2d46b3143939b015 Author: Ufuk Celebi <u...@apache.org> Date: 2017-11-29T15:28:18Z [FLINK-8167] [connector-wikiedits] Harden WikipediaEditsSource - Minor eager sanity checks - Use UUID suffix for nickname. As reported in FLINK-8167, the current nickname suffix can result in nickname clashes which lead to test failures. commit 06ec1542963bbe2afaf1ad1fd55a54d13f855304 Author: Ufuk Celebi <u...@apache.org> Date: 2017-11-29T15:36:29Z [FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test The WikipediaEditsSourceTest unnecessarily implements an integration test that starts a FlinkMiniCluster and executes a small Flink program. This simply creates a source and executes run in a separate thread until a single WikipediaEditEvent is received. ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r151946320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.ArrayDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Channel handler to read the messages of buffer response or error response from the + * producer, to write and flush the unannounced credits for the producer. + */ +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + /** Channels, which already requested partitions from the producers. */ + private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<>(); + + /** Channels, which will notify the producers about unannounced credit. */ + private final ArrayDeque inputChannelsWithCredit = new ArrayDeque<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = new ConcurrentHashMap<>(); + + private volatile ChannelHandlerContext ctx; --- End diff -- ð ---
[GitHub] flink issue #3442: [FLINK-5778] [savepoints] Add savepoint serializer with r...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3442 I had a quick chat with Stephan about this. @StefanRRichter has an idea how to properly implement this. Closing this PR and unassigning the issue. ---
[GitHub] flink pull request #3442: [FLINK-5778] [savepoints] Add savepoint serializer...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/3442 ---
[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4888 Merged in e8e2913. ---
[GitHub] flink pull request #4888: [backport] [FLINK-7067] Resume checkpointing after...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/4888 ---
[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4888 All failed tests are due to a similar issue that seems not to be related to the tests. Merging... ---
[GitHub] flink issue #4888: [backport] [FLINK-7067] Resume checkpointing after failed...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4888 @aljoscha Getting the following message in two of the builds in https://travis-ci.org/apache/flink/builds/291523047: ``` == Compilation/test failure detected, skipping shaded dependency check. == ``` Ever seen this? ---
[GitHub] flink pull request #4888: [backport] [FLINK-7067] Resume checkpointing after...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4888 [backport] [FLINK-7067] Resume checkpointing after failed cancel-job-with-savepoint This is a backport of #4254. I will merge this as soon as Travis gives the green light. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 7067-backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4888.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4888 commit 9226c3a15f8037851110fbdecf775cad99da771f Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-04T14:39:02Z [hotfix] [tests] Reduce visibility of helper class methods There is no need to make the helper methods public. No other class should even use this inner test helper invokable. commit c571929ce476f17d02ee22df0b5170b0eb322c2d Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-04T15:01:32Z [FLINK-7067] [jobmanager] Resume periodic checkpoints after failed cancel-job-with-savepoint Problem: If a cancel-job-with-savepoint request fails, this has an unintended side effect on the respective job if it has periodic checkpoints enabled. The periodic checkpoint scheduler is stopped before triggering the savepoint, but not restarted if a savepoint fails and the job is not cancelled. This commit makes sure that the periodic checkpoint scheduler is restarted iff periodic checkpoints were enabled before. This closes #4254. commit 074630a2fbd6dbdc7ff775ee9fb5d46c088dbc6d Author: Ufuk Celebi <u...@apache.org> Date: 2017-10-23T12:42:46Z [FLINK-7067] [jobmanager] Backport to 1.3 ---
[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4254 Travis gave the green light, merging this now. ---
[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4254 Cool! I'll rebase this and merge after Travis gives the green light. ---
[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4254 @tillrohrmann Thanks for looking over this. The `TestingCluster` is definitely preferable. I don't recall how I ended up with the custom setup instead of the `TestingCluster`. I changed the test to wait for another checkpoint after the failed savepoint. I also considered this for the initial PR, but went with mocking in order to test the case that periodic checkpoints were not activated before the cancellation [1]. I think the current variant is a good compromise between completeness and simplicity though. [1] As seen in the diff of `JobManager.scala`, we only activate the periodic scheduler after a failed cancellation iff it was activated before cancellation. This case can't be tested robustly with the current approach. We could wait for some time and if no checkpoint arrives in that time consider checkpoints as not accidentally activated, but that's not robust. I would therefore ignore this case if you don't have another idea. ---
[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4254#discussion_r145923787 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java --- @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception { } /** +* Tests that a failed cancel-job-with-savepoint request does not accidentally disable +* periodic checkpoints. +*/ + @Test + public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(true); + } + + /** +* Tests that a failed cancel-job-with-savepoint request does not accidentally enable +* periodic checkpoints. +*/ + @Test + public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(false); + } + + /** +* Tests that a failed savepoint does not cancel the job and that there are no +* unintended side effects. +* +* @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We +* need to test both here in order to verify that we don't accidentally disable or enable +* checkpoints after a failed cancel-job-with-savepoint request. +*/ + private void testCancelJobWithSavepointFailure( + boolean enablePeriodicCheckpoints) throws Exception { + + long checkpointInterval = enablePeriodicCheckpoints ? 360 : Long.MAX_VALUE; + + // Savepoint target + File savepointTarget = tmpFolder.newFolder(); + savepointTarget.deleteOnExit(); + + // Timeout for Akka messages + FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + + // A source that declines savepoints, simulating the behaviour + // of a failed savepoint. + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class); + sourceVertex.setParallelism(1); + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + try { + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + new Configuration(), + actorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + highAvailabilityServices, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + TestingUtils.TESTING_TIMEOUT()); + + ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + new Configuration(), + ResourceID.generate(), + actorSystem, + highAvailabilityServices, + "localhost", + Option.apply("tm"), + true, + TestingTaskManager.class); + + ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId); --- End diff -- Definitely +1 ---
[GitHub] flink issue #4693: [FLINK-7645][docs] Modify system-metrics part show in the...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4693 Hey @yew1eb, I like this ð I think it really makes sense to make this browsable from the table of contents. @zentol What do you think? I would be in favour of merging this if you don't have any objections. ---
[GitHub] flink pull request #4645: [FLINK-7582] [REST] Netty thread immediately parse...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4645#discussion_r137269070 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -159,24 +155,17 @@ public void shutdown(Time timeout) { } private CompletableFuture submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class responseClass) { - return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor) - .thenApply((channel) -> { - try { - return channel.sync(); - } catch (InterruptedException e) { - throw new FlinkRuntimeException(e); - } - }) - .thenApply((ChannelFuture::channel)) - .thenCompose(channel -> { - ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); - channel.writeAndFlush(httpRequest); - return future; - }).thenComposeAsync( - (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass), - executor - ); + ChannelFuture connect = bootstrap.connect(targetAddress, targetPort); + Channel channel; --- End diff -- I understand, but it can be confusing that a method which returns a future actually has a (potentially long) blocking operation. ---
[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4647 Thanks @zentol for the review. @jameslafa - Regarding the first point with checkstyle: It's confusing that our checkstyle settings don't catch this, but Chesnay is right here. Seems nitpicky but we try to avoid unnecessary formatting changes. - Regarding the changes to the *.js files: Since you didn't change any of the coffee scripts, there should be no need to commit those files and I would also suggest to remove those changes. The changes are probably due to different versions on your machine and the previous contributor who changed the files. I think this only reinforces the argument we had about committing the *.lock file too. Could you create a new JIRA for this? @zentol - I didn't understand your follow up comments regarding the `metricsFetched` flag. Could you please elaborate on what you mean? Is the flag ok to keep after #4472 is merged? ---
[GitHub] flink pull request #4645: [FLINK-7582] [REST] Netty thread immediately parse...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4645#discussion_r137038459 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -159,24 +155,17 @@ public void shutdown(Time timeout) { } private CompletableFuture submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class responseClass) { - return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor) - .thenApply((channel) -> { - try { - return channel.sync(); - } catch (InterruptedException e) { - throw new FlinkRuntimeException(e); - } - }) - .thenApply((ChannelFuture::channel)) - .thenCompose(channel -> { - ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); - channel.writeAndFlush(httpRequest); - return future; - }).thenComposeAsync( - (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass), - executor - ); + ChannelFuture connect = bootstrap.connect(targetAddress, targetPort); + Channel channel; --- End diff -- Couldn't you keep the async connect and only change the last `composeAsync` to `compose`? ---
[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4553 @twalthr Thanks for doing the work of merging this to the other branches as well. I've triggered a build for all branches. While 1.2 worked and the warning is now available online (https://ci.apache.org/projects/flink/flink-docs-release-1.2/), older branches don't build properly: https://ci.apache.org/builders/flink-docs-release-1.0/builds/545 https://ci.apache.org/builders/flink-docs-release-1.1/builds/391 Do you have a clue what we can do there? Also pinging @rmetzger who has some experience with the Apache infrastructure... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4553: [FLINK-7642] [docs] Add very obvious warning about...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4553 [FLINK-7642] [docs] Add very obvious warning about outdated docs ## What is the purpose of the change This pull requests make the warning about outdated docs more obvious. ![screen shot 2017-08-16 at 18 34 03](https://user-images.githubusercontent.com/1756620/29374684-93ac1abe-82b2-11e7-9b9b-5008ac33a8e5.png) Please compare the screenshot to our current state: https://ci.apache.org/projects/flink/flink-docs-release-1.1/ If you like this, I would back/forward port this to all versions >= 1.0 and update the releasing Wiki page to add a note about updating the configuration of the docs. ## Brief change log - Change the color of the outdated warning footer to red - Rename the config key from `is_latest` to `show_outdated_warning` - Add an outdated warning to every page before the actual content ## Verifying this change - We don't have any tests for the docs - You can manually check out my branch and build the docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 7462-outdated_docs_warning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4553 commit e140634687a56fb5fdd0e32a1dce3d5ac41fd123 Author: Ufuk Celebi <u...@apache.org> Date: 2017-08-16T16:21:49Z [FLINK-7462] [docs] Make outdated docs footer background light red commit 15fe71e0402d2e2d931485497338973e12cce9db Author: Ufuk Celebi <u...@apache.org> Date: 2017-08-16T16:34:51Z [FLINK-7462] [docs] Rename is_latest flag to show_outdated_warning commit 9bdeeae0cb88b59f0dfaee547fbf9b54d125645e Author: Ufuk Celebi <u...@apache.org> Date: 2017-08-16T16:35:03Z [FLINK-7462] [docs] Add outdated warning to content --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4360: [FLINK-7220] [checkpoints] Update RocksDB dependency to 5...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4360 Thanks for the pointer Greg. I didn't look at the JIRA issue. I like the idea of automating this but I would go with a pre-push hook instead of a pre-commit hook. It could be annoying during local dev if we force every commit to have the format already. I'm not sure whether INFRA allows this or not though. @StefanRRichter Sure, can happen. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4360: [FLINK-7220] [checkpoints] Update RocksDB dependency to 5...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4360 @StefanRRichter As mentioned by Greg, we should either squash follow up commits like `Stephan's comments` into their parent or tag them similar to the main commit with a more descriptive message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4391: [FLINK-7258] [network] Fix watermark configuration order
Github user uce commented on the issue: https://github.com/apache/flink/pull/4391 Thanks for the review @zhijiangW and @zentol. Merging this for `1.3` and `master`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4391: [FLINK-7258] [network] Fix watermark configuration...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4391 [FLINK-7258] [network] Fix watermark configuration order ## Purpose This PR changes the order in which low and high watermarks are configured for Netty server child connections (high first). That way we avoid running into an `IllegalArgumentException` when the low watermark is larger than the high watermark (relevant if the configured memory segment size is larger than the default). This situation surfaced only as a logged warning and the low watermark configuration was ignored. ## Changelog - Configure high watermark before low watermark in `NettyServer` - Configure high watermark before low watermark in `KvStateServer` ## Verifying this change - The change is pretty trivial with an extended `NettyServerLowAndHighWatermarkTest` that now checks the expected watermarks. - I didn't add a test for `KvStateServer`, because the watermarks can't be configured there manually. - To verify, you can run `NettyServerLowAndHighWatermarkTest` with logging before and after this change and verify that no warning is logged anymore. ## Does this pull request potentially affect one of the following parts? - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 7258-watermark_config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4391.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4391 commit 73998ba1328d4bf61ee979ed327b0a684ed03aa7 Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-24T16:47:23Z [FLINK-7258] [network] Fix watermark configuration order When configuring larger memory segment sizes, configuring the low watermark before the high watermark may lead to an IllegalArgumentException, because the low watermark will temporarily be higher than the high watermark. It's necessary to configure the high watermark before the low watermark. For the queryable state server in KvStateServer I didn't add an extra test as the watermarks cannot be configured there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4112: [FLINK-6901] Flip checkstyle configuration files
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4112#discussion_r126464199 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java --- @@ -61,7 +60,9 @@ public NettyBufferPool(int numberOfArenas) { checkArgument(numberOfArenas >= 1, "Number of arenas"); this.numberOfArenas = numberOfArenas; - if (!PlatformDependent.hasUnsafe()) { + try { + Class.forName("sun.misc.Unsafe"); + } catch (ClassNotFoundException e) { --- End diff -- Is ok with me to remove this check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4254#discussion_r125606557 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java --- @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception { } /** +* Tests that a failed cancel-job-with-savepoint request does not accidentally disable +* periodic checkpoints. +*/ + @Test + public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(true); + } + + /** +* Tests that a failed cancel-job-with-savepoint request does not accidentally enable +* periodic checkpoints. +*/ + @Test + public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(false); + } + + /** +* Tests that a failed savepoint does not cancel the job and that there are no +* unintended side effects. +* +* @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We +* need to test both here in order to verify that we don't accidentally disable or enable +* checkpoints after a failed cancel-job-with-savepoint request. +*/ + private void testCancelJobWithSavepointFailure( + boolean enablePeriodicCheckpoints) throws Exception { + + long checkpointInterval = enablePeriodicCheckpoints ? 360 : Long.MAX_VALUE; + + // Savepoint target + File savepointTarget = tmpFolder.newFolder(); + savepointTarget.deleteOnExit(); + + // Timeout for Akka messages + FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + + // A source that declines savepoints, simulating the behaviour + // of a failed savepoint. + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class); + sourceVertex.setParallelism(1); + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + try { + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + new Configuration(), + actorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + highAvailabilityServices, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + TestingUtils.TESTING_TIMEOUT()); + + ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + new Configuration(), + ResourceID.generate(), + actorSystem, + highAvailabilityServices, + "localhost", + Option.apply("tm"), + true, + TestingTaskManager.class); + + ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, askTimeout), askTimeout); + + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + checkpointInterval, + 360, + 0, + Integer.MAX_VALUE,
[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4254 [FLINK-7067] [jobmanager] Fix side effects after failed cancel-job-with-savepoint If a cancel-job-with-savepoint request fails, this has an unintended side effect on the respective job if it has periodic checkpoints enabled. The periodic checkpoint scheduler is stopped before triggering the savepoint, but not restarted if a savepoint fails and the job is not cancelled. This fix makes sure that the periodic checkpoint scheduler is restarted iff periodic checkpoints were enabled before. I have the test in a separate commit, because it uses Reflection to update a private field with a spied upon instance of the CheckpointCoordinator in order to test the expected behaviour. This is super fragile and ugly, but the alternatives require a large refactoring (use factories that can be set during tests) or don't test this corner case behaviour. The separate commit makes it easier to remove/revert it at a future point in time. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 7067-restart_checkpoint_scheduler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4254.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4254 commit 7294de0ef77a346b7b38d4b3fcdc421f7fd6855b Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-04T14:39:02Z [tests] Reduce visibility of helper class methods There is no need to make the helper methods public. No other class should even use this inner test helper invokable. commit ce924bc146d3cf97e0c5ddcc1ba16610b2fc8d49 Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-04T14:53:54Z [FLINK-7067] [jobmanager] Add test for cancel-job-with-savepoint side effects I have this test in a separate commit, because it uses Reflection to update private field with a spied upon instance of the CheckpointCoordinator in order to test the expected behaviour. This makes it easier to remove/revert at a future point in time. This is super fragile and ugly, but the alternatives require a large refactoring (use factories that can be set during tests) or don't test this corner case behaviour. commit 94aa444cbd7099d7830e06efe3525a717becb740 Author: Ufuk Celebi <u...@apache.org> Date: 2017-07-04T15:01:32Z [FLINK-7067] [jobmanager] Fix side effects after failed cancel-job-with-savepoint Problem: If a cancel-job-with-savepoint request fails, this has an unintended side effect on the respective job if it has periodic checkpoints enabled. The periodic checkpoint scheduler is stopped before triggering the savepoint, but not restarted if a savepoint fails and the job is not cancelled. This commit makes sure that the periodic checkpoint scheduler is restarted iff periodic checkpoints were enabled before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4213: [FLINK-7032] Overwrite inherited properties from parent p...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4213 Just ran into this as well. This was so annoying. Thank you very much for the fix... I spent an hour assuming it was a problem on my side. Merging now... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4163: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/4163 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4164: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/4164 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4164: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4164 [FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix version (1.2) Backport of #4162 for the `release-1.2` branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 6952-javadocs-release-1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4164.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4164 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4163: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4163 [FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix version (1.3) Backport of #4162 for the `release-1.3` branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 6952-javadocs-release-1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4163.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4163 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4162: [FLINK-6952] [FLINK-6985] [docs] Add Javadocs link...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/4162 [FLINK-6952] [FLINK-6985] [docs] Add Javadocs links and remove bugfix version Adds a link to the Javadocs in the left side navigation: ![screen shot 2017-06-20 at 11 46 38](https://user-images.githubusercontent.com/1756620/27427607-e200a3f4-573f-11e7-84ed-51b610c79fe1.png) Makes sure that the title and other places that reference the Flink release version, only point to the minor release version (e.g. `1.3`). For snapshot release, we display the full version. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 6952-javadocs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4162 commit 15752faf7059fd580d5da9da024a82e07209b879 Author: Ufuk Celebi <u...@apache.org> Date: 2017-06-20T10:19:07Z [FLINK-6952] [docs] Add link to Javadocs commit 151c13c9c33282493cb0d6ec6266f316e615ac26 Author: Ufuk Celebi <u...@apache.org> Date: 2017-06-22T09:37:54Z [FLINK-6985] [docs] Remove bugfix version from title Removes the bugfix version from title and other places where it can potentially be confusing. For snapshot release, version and version_short should be the same, e.g. 1.4-SNAPSHOT. For stable releases, version should be the full version string (e.g. 1.2.1) whereas version_short should skip the bugfix version (e.g. 1.2):. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3495: [FLINK-5781][config] Generation HTML from ConfigOption
Github user uce commented on the issue: https://github.com/apache/flink/pull/3495 I think it's up to you guys whether you want the short description in there or not. I see that it only makes sense if you actually invest the time to have a good short description. You would probably need a table like this then: ``` +-+-++ | key | default | short description | +-+-++ | long description with details (expandable) | ++ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3785: [FLINK-6337][network] Remove the buffer provider from Par...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3785 Thanks! Looks good to merge now ð I've rebased it and pushed it to Travis here: https://travis-ci.org/uce/flink/builds/227891373 As soon as it succeeds, I'm going to merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113904383 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -68,7 +68,11 @@ public static void shutdown() { @Override ResultSubpartition createSubpartition() { - return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager); + ResultPartition parent = mock(ResultPartition.class); --- End diff -- Could you make this a static helper method? ```java private static ResultPartition createMockPartition() { ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113905412 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); - fail("This program execution should have failed."); --- End diff -- I think that the idea of the test is to check that another program can be executed after one failed. The commit message introducing the test says "This test validates that task slots in co-location constraints are properly freed in the presence of failures." By removing this line, we are not testing what we actually wanted to test anymore. We should keep the line but instead decrease the number of configured buffers, for example 640 works instead of 840. That way we keep the behaviour: successful job, failed job, successful job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3796: [FLINK-6293] [tests] Harden JobManagerITCase
Github user uce commented on the issue: https://github.com/apache/flink/pull/3796 Good fix. +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113690389 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); - fail("This program execution should have failed."); --- End diff -- Why did you remove this line? I think it's important to keep this line here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113690239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -351,15 +351,15 @@ public void destroyBufferPool() { /** * Returns the requested subpartition. */ - public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { int refCnt = pendingReferences.get(); checkState(refCnt != -1, "Partition released."); checkState(refCnt > 0, "Partition not pinned."); checkElementIndex(index, subpartitions.length, "Subpartition not found."); - ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener); + ResultSubpartitionView readView = subpartitions[index].createReadView(bufferPool, availabilityListener); --- End diff -- I think we can completely remove the buffer provider from the `createReadView` method: - In `SpillableSubpartition#createReadView` we can use the segment size of the buffer pool of the spillable subpartition itself (`parent.getBufferProvider().getMemorySegmentSize()`). - In `PipelinedSubpartition#createReadView` we don't use the argument anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3606: [FLINK-6182] Fix possible NPE in SourceStreamTask
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3606 [FLINK-6182] Fix possible NPE in SourceStreamTask A user ran into this and reported confusing NPEs in the logs. This could only happen if a source task is cancelled before it was invoked. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 6182-npe_source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3606.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3606 commit b61ee091e843571e546146095842a0bf049a8910 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-24T11:24:19Z [FLINK-6182] Fix possible NPE in SourceStreamTask --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3597: [FLINK-6170] [metrics] Don't rely on stats snapshot for c...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3597 Thanks, will merge after Travis gives a green light. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3597: [FLINK-6170] [metrics] Don't rely on stats snapsho...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3597 [FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics Some checkpoint metrics use the latest stats snapshot to get the returned metric value. In practice, this means that these metrics are only updated when users are browsing the web UI (only then is a new snapshot created). Instead of relying on the latest snapshot, the checkpoint metrics for the latest completed checkpoint should be directly updated when a checkpoint completes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 6170-checkpoint_metrics_update Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3597.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3597 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp
Github user uce commented on the issue: https://github.com/apache/flink/pull/3583 Thanks for addressing the comments. I'm unsure about the `ErrorInfo` instead of adding the suggested method. Since @tillrohrmann and @StephanEwen often work on the `ExecutionGraph` I would like their input on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3577: [FLINK-6133] fix build status in README.md
Github user uce commented on the issue: https://github.com/apache/flink/pull/3577 There is a discussion on the dev mailing list whether we want to have this or not. I would wait until that is resolved before continuing here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation check
Github user uce commented on the issue: https://github.com/apache/flink/pull/3572 Thanks for catching the unfortunate typo. I've fixed all of those. I left the confusing deprecation comment as is in the table API. I think this is good to merge then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106949177 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs;
[GitHub] flink pull request #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation c...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3572#discussion_r106942215 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java --- @@ -114,15 +114,21 @@ public FlinkAggregateExpandDistinctAggregatesRule( this.useGroupingSets = useGroupingSets; } - @Deprecated // to be removed before 2.0 + /** +* @deprecated to be removed before 2.0 --- End diff -- I don't know, this is independent imo as it's just copying the previous comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106948899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** +* Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}. +* +* @param rootPath directory to which the archive should be written to +* @param graph graph to archive +* @return path to where the archive was written, or null if no archive was created +* @throws IOException +*/ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106948356 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs;
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106902557 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.archive.fs.refresh-interval") + .defaultValue(1L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.fs.dirs") --- End diff -- One last thought here: What do you think about having the same suffix `dirs` or `dir` for both `jobmanager.archive.fs` and `historyserver.archive.fs` for the sake of consistency? I know that the HS accepts multiple dirs and the JM only one, but it might help prevent typos etc. when configuring the history server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900975 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}. + * + * The archives are first copied into a temporary directory in {@link HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and + * then expanded. The resulting file structure is analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900609 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which + * the JobManager may have already shut down. + * + * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. + * + * All configuration options are defined in{@link HistoryServerOptions}. + * + * The WebInterface only displays the "Completed Jobs" page. + * + * The REST API is limited to + * + * /config + * /joboverview + * /jobs/:jobid/* + * + * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + */ +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the history server + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run()
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900349 --- Diff: docs/monitoring/rest_api.md --- @@ -22,36 +22,69 @@ specific language governing permissions and limitations under the License. --> -Flink has a monitoring API that can be used to query status and statistics of running jobs, as well as recent completed jobs. +Flink has a monitoring API that can be used to query the status and statistics of running jobs, as well as recent completed jobs. This monitoring API is used by Flink's own dashboard, but is designed to be used also by custom monitoring tools. The monitoring API is a REST-ful API that accepts HTTP GET requests and responds with JSON data. * This will be replaced by the TOC {:toc} - ## Overview -The monitoring API is backed by a web server that runs as part of the *JobManager*. By default, this server listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though. + + + +The jobManager monitoring API allows you to query the status and statistics of running jobs, as well as recent completed jobs. + +By default, this server binds to `localhost`` and listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.address` and `jobmanager.web.port`. In the case of multiple JobManagers (for high availability), each JobManager will run its own instance of the monitoring API, which offers information about completed and running job while that JobManager was elected the cluster leader. + + + --- End diff -- I think it's OK to keep it here as more detailed documentation, but I would vote to add a custom page which lists the relevant configuration options on a single page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900686 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which + * the JobManager may have already shut down. + * + * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. + * + * All configuration options are defined in{@link HistoryServerOptions}. + * + * The WebInterface only displays the "Completed Jobs" page. + * + * The REST API is limited to + * + * /config + * /joboverview + * /jobs/:jobid/* + * + * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + */ +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the history server + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run()
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106901748 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.OperatingSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** +* Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}. +* +* @param rootPath directory to which the archive should be written to +* @param graph graph to archive +* @return path to where the archive was written, or null if no archive was created +* @throws IOException +*/ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); + return path; + } catch (IOException e) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106903661 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}. + * + * The archives are first copied into a temporary directory in {@link HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and + * then expanded. The resulting file structure is analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link
[GitHub] flink pull request #3572: [FLINK-6127] [checkstyle] Add MissingDeprecation c...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3572 [FLINK-6127] [checkstyle] Add MissingDeprecation check Adds the MissingDeprecation check to our set of checkstyle rules. Requires every `@Deprecated` annotation to have a `@deprecated` JavaDoc, forcing us to have both or none. This also applies for internal classes. Most of the changes were cosmetic (e.g. we had some comments but were not using the `@deprecated` JavaDoc). You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink checkstyle_deprecation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3572 commit 06d70219566a43049f07ff3159f556fe70dbcaec Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T10:45:52Z [FLINK-6127] [checkstyle] Add MissingDeprecation check commit a1f8b273dc8a662f38ebd92cf092f38f53008d35 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T10:57:24Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-core commit 31377693c0e2ed95f0a6907059c65e21a6d78edf Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T11:03:42Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-java commit de238a72a6920d6a34e6d36b5fdb62b23ab0c4ab Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T11:20:38Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-runtime commit f909ab65f141ad4dcceb48fe616294adda43de53 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T12:46:27Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-streaming-java commit b111cf665abcbb174d43a694f90570dc90c3da62 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T12:48:27Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-statebackend-rocksdb commit 5532c8bbb02b716a5fd6b6e1d58f712c16ad5b55 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T12:50:52Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-table commit 653a6f2c7848a0311c3ffbdfc3acc193101001c8 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T12:55:51Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-connector-kafka-0.8 commit 83e797d86f1bb8c07cf07b16517eb60bedf355d7 Author: Ufuk Celebi <u...@apache.org> Date: 2017-03-20T12:57:55Z [FLINK-6127] [checkstyle] Fix missing @deprecated in flink-yarn --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3552: [FLINK-5978] Fix JM WebFrontend address ConfigOption is d...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3552 Looking into #2680 that added this change, it was meant to be followed up on and move all web configuration to a new class. Could we do this here and not only a single one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3543: [FLINK-5985] [Backport for 1.2] Report no task states for...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3543 Could you merge this @StefanRRichter? I think it is one of the last blockers for 1.2.1 and it's a pretty critical issue... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3561: [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnv...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3561 @aljoscha Could you have a brief look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3494: [FLINK-5635] [docker] Improve Docker tooling
Github user uce commented on the issue: https://github.com/apache/flink/pull/3494 Thanks for this :-) It was interesting to follow your discussions @iemejia, @jgrier and @patricklucas. I've tried this out and it works as expected. I'm going to merge this later today if there are no objections. My understanding is that after merging this PR we can continue with #3500. If I misunderstood anything, please ping me here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105929511 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map<String, Path> cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105929296 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map<String, Path> cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105928423 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); --- End diff -- I was under the impression that the `historyserver.web.refresh-interval` does not affect the actual web refresh interval specified in index.coffee, right? What I thought originally was that the fs refresh interval being smaller than the web refresh interval does not make sense if we only update the frontend every 10s. But I forgot that people also manually browse the pages ;-) I'm still in favour to increase as you say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105906846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java --- @@ -32,6 +32,13 @@ public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE = key("job-manager.max-attempts-history-size").defaultValue(16); + /** +* The location where the {@link JobManager} stores the archives for finished jobs. +*/ + public static final ConfigOption ARCHIVE_DIR = + key("jobmanager.archive.dir") --- End diff -- Should we rename this to be in line with my other proposal to allow future changes? E.g. `jobmanager.archive.fs.dir`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105918471 --- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade --- @@ -0,0 +1,60 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +doctype html +html(lang='en') + head +meta(charset='utf-8') +meta(http-equiv='X-UA-Compatible', content='IE=edge') +meta(name='viewport', content='width=device-width, initial-scale=1') + +title Apache Flink Web Dashboard + +link(rel="apple-touch-icon", sizes="180x180", href="images/apple-touch-icon.png") +link(rel="icon", type="image/png", href="images/favicon-32x32.png", sizes="32x32") +link(rel="icon", type="image/png", href="images/favicon-16x16.png", sizes="16x16") +link(rel="manifest", href="images/manifest.json") +link(rel="mask-icon", href="images/safari-pinned-tab.svg", color="#aa1919") +link(rel="shortcut icon", href="images/favicon.ico") +meta(name="msapplication-config", content="images/browserconfig.xml") +meta(name="theme-color", content="#ff") + +link(rel='stylesheet', href='css/vendor.css', type='text/css') +link(rel='stylesheet', href='css/index.css', type='text/css') + +script(src="js/vendor.js") +script(src="js/hs/index.js") + + body(ng-app="flinkApp" ng-strict-di) +#sidebar(ng-class="{ 'sidebar-visible': sidebarVisible }") + nav.navbar.navbar-inverse.navbar-static-top +.navbar-header + a.navbar-brand(ui-sref="completed-jobs") +img.logo(alt="Apache Flink Dashboard" src="images/flink-logo.png") + a.navbar-brand.navbar-brand-text(ui-sref="completed-jobs") +| Apache Flink Dashboard --- End diff -- Also rename to `Apache Flink History Server`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898166 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map<String, Path> cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105894483 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map<String, Path> cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105876698 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + if (config.getBoolean(HistoryServerOptions.HISTO
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105894735 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map<String, Path> cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105877694 --- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade --- @@ -0,0 +1,60 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +doctype html +html(lang='en') + head +meta(charset='utf-8') +meta(http-equiv='X-UA-Compatible', content='IE=edge') +meta(name='viewport', content='width=device-width, initial-scale=1') + +title Apache Flink Web Dashboard --- End diff -- Should we adjust the title to `Apache Flink History Server`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---