[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4757 ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606599 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -65,15 +65,28 @@ public void getMapFor() throws Exception { assertEquals("2", metrics.get("abc.metric3")); assertEquals("3", metrics.get("abc.metric4")); + assertEquals( + "[" + + "{\"id\":\"abc.metric4\"}," + + "{\"id\":\"abc.metric3\"}" + + "]", + handler.getAvailableMetricsList(pathParams)); + assertEquals("", handler.getMetricsValues(pathParams, "")); + assertEquals( + "[" + + "{\"id\":\"abc.metric3\",\"value\":\"2\"}," + + "{\"id\":\"abc.metric4\",\"value\":\"3\"}" + + "]", + handler.getMetricsValues(pathParams, "abc.metric3,abc.metric4")); } @Test public void getMapForNull() { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606524 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -50,10 +50,10 @@ public void testGetPaths() { @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606332 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message header for metrics handler. + */ +public final class MetricsHeaders implements MessageHeaders{ + + private static final MetricsHeaders INSTANCE = new MetricsHeaders(); + + public static final String PARAMETER_JOB_ID = "jobid"; --- End diff -- where is this field used? ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -32,8 +46,11 @@ * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { +public class JobMetricsHandler extends AbstractMetricsHandler --- End diff -- Why aren't we simply implementing a new handler? I think that we don't reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need to use it as a base class. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604013 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); --- End diff -- why are you adding things to the old `WebRuntimeMonitor`? ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606465 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Response of metrics handlers, represented as a list of {@link MetricEntry}. + */ +public class MetricsOverview extends ArrayList implements ResponseBody { --- End diff -- Let's not directly extend from `ArrayList` but instead use composition. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609556 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected MapgetMapFor(Map pathParams, MetricStore metrics) { MetricStore.ComponentMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + fetcher.update(); + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + List requestedMetrics = request.getQueryParameter(MetricNameParameter.class); + return getMetricsOverview(jobID, requestedMetrics); + }, + executor); + } + + protected MetricsOverview getMetricsOverview(JobID jobID, List requestedMetrics) { + Map metricsMap = getMetricsMapByJobId(jobID, fetcher.getMetricStore()); + if (metricsMap == null) { + return new MetricsOverview(); + } + + if (requestedMetrics == null || requestedMetrics.isEmpty()) { + return new MetricsOverview( + metricsMap.entrySet().stream() + .map(e -> new MetricEntry(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } else { + return new MetricsOverview( + requestedMetrics.stream() + .filter(e -> metricsMap.get(e) != null) + .map(e -> new MetricEntry(e, metricsMap.get(e))) + .collect(Collectors.toList())); --- End diff -- I think by not using Java streams we can avoid to do for every `e in requestedMetrics` two `HashMap` lookups and instead do it with a single lookup. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606039 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for getting metrics. + */ +public class MetricMessageParameters extends MessageParameters { --- End diff -- Let's extend from `JobMessageParameters` ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604078 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); --- End diff -- Please revert formatting changes. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148610350 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java --- @@ -120,6 +121,16 @@ public synchronized ComponentMetricStore getJobMetricStore(String jobID) { } /** +* Returns the {@link ComponentMetricStore} for the given job ID. +* +* @param jobID job ID +* @return ComponentMetricStore for the given ID, or null if no store for the given argument exists +*/ + public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) { + return jobID == null ? null : ComponentMetricStore.unmodifiable(jobs.get(jobID.toString())); --- End diff -- `jobID` should not be nullable. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706327 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +/** + * + */ --- End diff -- JavaDocs missing ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143705876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntry.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ --- End diff -- Java docs missing ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143712463 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected MapgetMapFor(Map pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + protected Map getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { --- End diff -- I think you missed triggering the `fetcher.update`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706550 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +/** + * + */ +public interface JobMetricsOverview extends ResponseBody { --- End diff -- What's the purpose of this interface? Is it intended as a marker interface? I think it's not necessary and can be removed. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143707577 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * + */ --- End diff -- JavaDocs ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143711982 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntryList.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * + */ +public class JobMetricEntryList extends ArrayList implements JobMetricsOverview { --- End diff -- I think composition makes more sense here than inheritance, because you will never use a `JobMetricEntryList` in a place where you would use a `List`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706150 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ +public class JobMetricId { + public static final String FIELD_NAME_ID = "id"; + + @JsonProperty(FIELD_NAME_ID) + private String id; + + @JsonCreator + public JobMetricId( + @JsonProperty(FIELD_NAME_ID) String id) { + this.id = id; --- End diff -- null check missing ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143708819 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMetricsMessageParameters.java --- @@ -22,19 +22,20 @@ import java.util.Collections; /** - * Message parameters which require a job path parameter. + * A job related REST handler always requires a {@link JobIDPathParameter}. */ -public class JobMessageParameters extends MessageParameters { +public class JobMetricsMessageParameters extends MessageParameters { --- End diff -- Why do you move this class? I think you should create a new sub class of `JobMessageParameters` which adds the `metricQueryParameter`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706896 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected MapgetMapFor(Map pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + protected Map getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + + synchronized (metricStore) { + List queryParameters = request.getQueryParameter(JobIDQueryParameter.class); + String requestedMetricsList = queryParameters.get(0); --- End diff -- why are we doing the parsing of the query parameter inside of the synchronized block? ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ --- End diff -- JavaDocs missing ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143710507 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -18,26 +18,48 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobIDQueryParameter; +import org.apache.flink.runtime.rest.messages.JobMetricsMessageParameters; + +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. * * If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} + * If the query parameters do contain a "get" parameter, a comma-separate list of metric names is expected as a value. + * {@code ?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { +public class JobMetricsHandler extends AbstractMetricsHandler --- End diff -- I think we should change the handler such that it always returns a `JobMetricsEntryList` and that you can add filter conditions via `:jobid/metrics?filter=X,Y` or with get. If you don't specify the get/filter query parameter, then you get the full list of metrics, otherwise you get the filtered list. That way we don't have two different return types which we have to distinguish. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143705930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntry.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ +public class JobMetricEntry { + public static final String FIELD_NAME_ID = "id"; + public static final String FIELD_NAME_VALUE = "value"; + + @JsonProperty(FIELD_NAME_ID) + private final String id; + + @JsonProperty(FIELD_NAME_VALUE) + private final String value; + + @JsonCreator + public JobMetricEntry( + @JsonProperty(FIELD_NAME_ID) String id, + @JsonProperty(FIELD_NAME_VALUE) String value) { + this.id = id; + this.value = value; --- End diff -- null checks via `Preconditions.checkNotNull` missing. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706271 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricIdList.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * + */ --- End diff -- JavaDocs ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143708645 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected MapgetMapFor(Map pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + protected Map getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + + synchronized (metricStore) { + List queryParameters = request.getQueryParameter(JobIDQueryParameter.class); --- End diff -- I think this should already be the accumulated values of the get query parameters. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143710757 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -18,26 +18,48 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobIDQueryParameter; +import org.apache.flink.runtime.rest.messages.JobMetricsMessageParameters; + +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. * * If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} + * If the query parameters do contain a "get" parameter, a comma-separate list of metric names is expected as a value. + * {@code ?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { +public class JobMetricsHandler extends AbstractMetricsHandler + implements LegacyRestHandler{ --- End diff -- I think `JobMetricsOverview` won't work here, because you have to add polymorphic type information. Otherwise Jackson won't be able to restore a `JobMetricsOverview` instance. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142851413 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ +public class JobMetricId { --- End diff -- renamed ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142851216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- moved to `org.apache.flink.runtime.rest.messages.metrics` ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142851091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +/** + * + */ +public interface JobMetricsOverview extends ResponseBody { --- End diff -- renamed ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142850939 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * + */ +public class JobIDQueryParameter extends MessageQueryParameter { --- End diff -- renamed ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142850915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- moved to `org.apache.flink.runtime.rest.messages.metrics` ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142688438 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -26,15 +26,16 @@ */ public class JobMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter(); + private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter(); --- End diff -- I would suggest to instead create an abstract MetricsMessageParameters class (containing the get thingie) and extend that for job/task/subtask. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142689891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -18,38 +18,95 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobIDQueryParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobMetricsHeaders; + +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. * * If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} + * If the query parameters do contain a "get" parameter, a comma-separate list of metric names is expected as a value. + * {@code ?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { - public static final String PARAMETER_JOB_ID = "jobid"; - private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; +public class JobMetricsHandler extends AbstractMetricsHandler + implements LegacyRestHandler{ + + private final MetricStore metricStore; public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { super(executor, fetcher); + metricStore = fetcher.getMetricStore(); } @Override public String[] getPaths() { - return new String[]{JOB_METRICS_REST_PATH}; + return new String[]{JobMetricsHeaders.JOB_METRICS_REST_PATH}; } @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(JobMetricsHeaders.PARAMETER_JOB_ID)); + return job != null ? job.metrics : null; + } + + protected Map getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + + synchronized (metricStore) { + List queryParameters = request.getQueryParameter(JobIDQueryParameter.class); + String requestedMetricsList = queryParameters.get(0); + + return getJobMetricsOverview(jobID, requestedMetricsList); + } + }, + executor); + } + + protected JobMetricsOverview getJobMetricsOverview(JobID jobID, String requestedMetricsList) { --- End diff -- same as above ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142689531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- move out of legacy namespace. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142689501 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +/** + * + */ +public interface JobMetricsOverview extends ResponseBody { --- End diff -- rename to `MetricsOverview`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142688020 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * + */ +public class JobIDQueryParameter extends MessageQueryParameter { --- End diff -- rename to `MetricNameParameter`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142689803 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -18,38 +18,95 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList; +import org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobIDQueryParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobMetricsHeaders; + +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. * * If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} + * If the query parameters do contain a "get" parameter, a comma-separate list of metric names is expected as a value. + * {@code ?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { - public static final String PARAMETER_JOB_ID = "jobid"; - private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; +public class JobMetricsHandler extends AbstractMetricsHandler + implements LegacyRestHandler{ + + private final MetricStore metricStore; public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { super(executor, fetcher); + metricStore = fetcher.getMetricStore(); } @Override public String[] getPaths() { - return new String[]{JOB_METRICS_REST_PATH}; + return new String[]{JobMetricsHeaders.JOB_METRICS_REST_PATH}; } @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(JobMetricsHeaders.PARAMETER_JOB_ID)); + return job != null ? job.metrics : null; + } + + protected Map getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { --- End diff -- we can keep this logic in the AbstractMetricsHandler after having it implement the LegacyHandlerAdapter interface ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142688520 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMetricsHeaders.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; --- End diff -- move to `...rest.messages.metrics`. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142690660 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- move out of legacy namespace. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142689455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricIdList.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * + */ +public class JobMetricIdList extends ArrayList implements JobMetricsOverview { --- End diff -- I would simply add a list to the JobMetricsOverview class, making this class unnecessary. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142690760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntryList.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * + */ +public class JobMetricEntryList extends ArrayList implements JobMetricsOverview { --- End diff -- same as below. ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4757 [Flink 7694][REST][Webfrontend]Port JobMetricsHandler to new REST handler ## What is the purpose of the change Port JobMetricsHandler to new REST handler ## Brief change log - added `JobMetricEntry`, `JobMetricEntryList`, `JobMetricId`, `JobMetricIdList` for json serialization. They all implement `JobMetricsOverview` so the `LegacyRestHandler` can have a single type of `ResponseBody` - implemented `JobMetricsHandler#handleRequests()` - added `JobMetricsHeaders` ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for marshalling `JobMetricIdList` and `JobMetricEntryList` ## Does this pull request potentially affect one of the following parts: - Anything that affects deployment or recovery: JobManager (and its components) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7694 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4757.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4757 commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361 Author: Bowen LiDate: 2017-09-27T22:30:02Z [FLINK-7724] add extra metrics to MetricStoreTest.setupStore commit d223444eba17c620e8c45c372a7b2e6fafdfd169 Author: Bowen Li Date: 2017-09-27T22:39:17Z delete unrelated files commit 4ac192c96a1b905ecc92c6bcb369ca252b511ae5 Author: Bowen Li Date: 2017-09-28T04:04:30Z Merge branch 'FLINK-7724' into FLINK-7694 commit 2815e66d511e72f11766bb516a9851339e379c5e Author: Bowen Li Date: 2017-09-28T05:25:04Z add more classes commit c5716edb64ed7f7ff6dda7bf48b2f90e9a80cd07 Author: Bowen Li Date: 2017-09-28T06:09:20Z [FLINK-7660] Support sideOutput in ProcessAllWindowFunction commit 04a2b59bf45c62318a20d897b254fc376d58abca Author: Bowen Li Date: 2017-09-28T16:25:31Z checkstyle commit 0e761a8e4a6c606c89e4bc5669c777a20e467183 Author: Bowen Li Date: 2017-09-28T06:09:20Z [FLINK-7660] Support sideOutput in ProcessAllWindowFunction commit 9e7c6cc3f86570938feb0ded48b204d4f320cb69 Author: Bowen Li Date: 2017-09-28T21:26:28Z merge master commit ac99b8f779f1b4275a1ee6df1527e406495c61c6 Author: Bowen Li Date: 2017-09-29T05:11:39Z add more classes commit 1402267e416916496a60c58bda7f64612d2e2ea5 Author: Bowen Li Date: 2017-09-29T05:34:47Z add JobIDQueryParameter commit bc4fc397e1fa4cc48e74d5b2157be2a8cf5f37ee Author: Bowen Li Date: 2017-09-29T06:43:51Z checkstyle commit 31a73f3c86d6afa2f4693e9d56bc709fdfb42e19 Author: Bowen Li Date: 2017-09-29T07:28:51Z add apache license to files commit 2ad3065b237c9ecb4db67a486da48e621097470e Author: Bowen Li Date: 2017-09-29T07:34:40Z checkstyle commit 99ec7363f7f61fa1fae201c4fd470d0fa2d6f5b2 Author: Bowen Li Date: 2017-09-28T06:09:20Z [FLINK-7660] Support sideOutput in ProcessAllWindowFunction commit 21e1185ddc999b13164317f2291ff02e7660e619 Author: Bowen Li Date: 2017-09-29T18:49:33Z add JobMetricsHeaders commit 1981e24373f8a849c2817387ce806103ebfcb3c7 Author: Bowen Li Date: 2017-09-29T18:51:29Z add apache license commit 76204dbfc0ade9fd84abbff417b388b92b73827e Author: Bowen Li Date: 2017-09-29T23:09:35Z checkstyle commit 1d3ebfa0781ee3be1a86e024fc40d6bdd59217f5 Author: Bowen Li Date: 2017-09-29T23:34:01Z fix class commit 1d9d113cdd577bfbfabba78e887f99f8b6aa2c51 Author: Bowen Li Date: 2017-09-29T23:34:48Z remove extra lines commit 9518958a15248cf25bcc1728d02c7e16c8491d81 Author: Bowen Li Date: 2017-09-29T23:36:14Z remove unused classes commit 28ab3df8c53cc53ce9b8de95e5fd6be3308649d9 Author: Bowen Li Date: 2017-10-02T03:36:26Z fix JobVertexMetricsHandlerTest commit 0fe6cd335e98f6ad92cc543053761d72bea6c033 Author: Bowen Li Date: 2017-10-02T03:39:56Z merge master commit 71089554ccf229b8396c622eea7b55ec7252a670 Author: Bowen Li Date: 2017-10-02T03:59:56Z revert unrelated changes commit