[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-12-02 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4757


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606599
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -65,15 +65,28 @@ public void getMapFor() throws Exception {
 
assertEquals("2", metrics.get("abc.metric3"));
assertEquals("3", metrics.get("abc.metric4"));
+   assertEquals(
+   "[" +
+   "{\"id\":\"abc.metric4\"}," +
+   "{\"id\":\"abc.metric3\"}" +
+   "]",
+   handler.getAvailableMetricsList(pathParams));
+   assertEquals("", handler.getMetricsValues(pathParams, ""));
+   assertEquals(
+   "[" +
+   
"{\"id\":\"abc.metric3\",\"value\":\"2\"}," +
+   
"{\"id\":\"abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   handler.getMetricsValues(pathParams, 
"abc.metric3,abc.metric4"));
}
 
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606524
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -50,10 +50,10 @@ public void testGetPaths() {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606332
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message header for metrics handler.
+ */
+public final class MetricsHeaders implements 
MessageHeaders {
+
+   private static final MetricsHeaders INSTANCE = new MetricsHeaders();
+
+   public static final String PARAMETER_JOB_ID = "jobid";
--- End diff --

where is this field used?


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -32,8 +46,11 @@
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
+public class JobMetricsHandler extends AbstractMetricsHandler
--- End diff --

Why aren't we simply implementing a new handler? I think that we don't 
reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need 
to use it as a base class. 


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604013
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
+   get(router, new TaskManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
--- End diff --

why are you adding things to the old `WebRuntimeMonitor`?


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606465
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Response of metrics handlers, represented as a list of {@link 
MetricEntry}.
+ */
+public class MetricsOverview extends ArrayList implements 
ResponseBody {
--- End diff --

Let's not directly extend from `ArrayList` but instead use composition.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609556
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.ComponentMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   fetcher.update();
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   List requestedMetrics = 
request.getQueryParameter(MetricNameParameter.class);
+   return getMetricsOverview(jobID, 
requestedMetrics);
+   },
+   executor);
+   }
+
+   protected MetricsOverview getMetricsOverview(JobID jobID, List 
requestedMetrics) {
+   Map metricsMap = getMetricsMapByJobId(jobID, 
fetcher.getMetricStore());
+   if (metricsMap == null) {
+   return new MetricsOverview();
+   }
+
+   if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+   return new MetricsOverview(
+   metricsMap.entrySet().stream()
+   .map(e -> new 
MetricEntry(e.getKey(), e.getValue()))
+   
.collect(Collectors.toList()));
+   } else {
+   return new MetricsOverview(
+   requestedMetrics.stream()
+   .filter(e -> 
metricsMap.get(e) != null)
+   .map(e -> new 
MetricEntry(e, metricsMap.get(e)))
+   
.collect(Collectors.toList()));
--- End diff --

I think by not using Java streams we can avoid to do for every `e in 
requestedMetrics` two `HashMap` lookups and instead do it with a single lookup.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606039
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for getting metrics.
+ */
+public class MetricMessageParameters extends MessageParameters {
--- End diff --

Let's extend from `JobMessageParameters`


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604078
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
--- End diff --

Please revert formatting changes.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148610350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 ---
@@ -120,6 +121,16 @@ public synchronized ComponentMetricStore 
getJobMetricStore(String jobID) {
}
 
/**
+* Returns the {@link ComponentMetricStore} for the given job ID.
+*
+* @param jobID job ID
+* @return ComponentMetricStore for the given ID, or null if no store 
for the given argument exists
+*/
+   public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) 
{
+   return jobID == null ? null : 
ComponentMetricStore.unmodifiable(jobs.get(jobID.toString()));
--- End diff --

`jobID` should not be nullable.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706327
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+/**
+ *
+ */
--- End diff --

JavaDocs missing


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143705876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntry.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
--- End diff --

Java docs missing


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143712463
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   protected Map getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
--- End diff --

I think you missed triggering the `fetcher.update`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706550
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+/**
+ *
+ */
+public interface JobMetricsOverview extends ResponseBody {
--- End diff --

What's the purpose of this interface? Is it intended as a marker interface? 
I think it's not necessary and can be removed.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143707577
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ *
+ */
--- End diff --

JavaDocs


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143711982
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntryList.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ *
+ */
+public class JobMetricEntryList extends ArrayList 
implements JobMetricsOverview {
--- End diff --

I think composition makes more sense here than inheritance, because you 
will never use a `JobMetricEntryList` in a place where you would use a `List`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
+public class JobMetricId {
+   public static final String FIELD_NAME_ID = "id";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private String id;
+
+   @JsonCreator
+   public JobMetricId(
+   @JsonProperty(FIELD_NAME_ID) String id) {
+   this.id = id;
--- End diff --

null check missing


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143708819
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMetricsMessageParameters.java
 ---
@@ -22,19 +22,20 @@
 import java.util.Collections;
 
 /**
- * Message parameters which require a job path parameter.
+ * A job related REST handler always requires a {@link JobIDPathParameter}.
  */
-public class JobMessageParameters extends MessageParameters {
+public class JobMetricsMessageParameters extends MessageParameters {
--- End diff --

Why do you move this class? I think you should create a new sub class of 
`JobMessageParameters` which adds the `metricQueryParameter`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706896
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   protected Map getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+
+   synchronized (metricStore) {
+   List queryParameters = 
request.getQueryParameter(JobIDQueryParameter.class);
+   String requestedMetricsList = 
queryParameters.get(0);
--- End diff --

why are we doing the parsing of the query parameter inside of the 
synchronized block?


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
--- End diff --

JavaDocs missing


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143710507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -18,26 +18,48 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobIDQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobMetricsMessageParameters;
+
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
  *
  * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * If the query parameters do contain a "get" parameter a 
comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * If the query parameters do contain a "get" parameter, a 
comma-separate list of metric names is expected as a value.
+ * {@code ?get=X,Y}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
+public class JobMetricsHandler extends AbstractMetricsHandler
--- End diff --

I think we should change the handler such that it always returns a 
`JobMetricsEntryList` and that you can add filter conditions via 
`:jobid/metrics?filter=X,Y` or with get. If you don't specify the get/filter 
query parameter, then you get the full list of metrics, otherwise you get the 
filtered list. That way we don't have two different return types which we have 
to distinguish.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143705930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntry.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
+public class JobMetricEntry {
+   public static final String FIELD_NAME_ID = "id";
+   public static final String FIELD_NAME_VALUE = "value";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final String id;
+
+   @JsonProperty(FIELD_NAME_VALUE)
+   private final String value;
+
+   @JsonCreator
+   public JobMetricEntry(
+   @JsonProperty(FIELD_NAME_ID) String id,
+   @JsonProperty(FIELD_NAME_VALUE) String value) {
+   this.id = id;
+   this.value = value;
--- End diff --

null checks via `Preconditions.checkNotNull` missing.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143706271
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricIdList.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ *
+ */
--- End diff --

JavaDocs


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143708645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   protected Map getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+
+   synchronized (metricStore) {
+   List queryParameters = 
request.getQueryParameter(JobIDQueryParameter.class);
--- End diff --

I think this should already be the accumulated values of the get query 
parameters.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r143710757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -18,26 +18,48 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobIDQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobMetricsMessageParameters;
+
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
  *
  * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * If the query parameters do contain a "get" parameter a 
comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * If the query parameters do contain a "get" parameter, a 
comma-separate list of metric names is expected as a value.
+ * {@code ?get=X,Y}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
+public class JobMetricsHandler extends AbstractMetricsHandler
+   implements LegacyRestHandler {
--- End diff --

I think `JobMetricsOverview` won't work here, because you have to add 
polymorphic type information. Otherwise Jackson won't be able to restore a 
`JobMetricsOverview` instance.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142851413
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
+public class JobMetricId {
--- End diff --

renamed


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142851216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
--- End diff --

moved to `org.apache.flink.runtime.rest.messages.metrics`


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142851091
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+/**
+ *
+ */
+public interface JobMetricsOverview extends ResponseBody {
--- End diff --

renamed


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142850939
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ *
+ */
+public class JobIDQueryParameter extends MessageQueryParameter {
--- End diff --

renamed


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142850915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
--- End diff --

moved to `org.apache.flink.runtime.rest.messages.metrics`


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142688438
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -26,15 +26,16 @@
  */
 public class JobMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   private final JobIDPathParameter jobIDPathParameter = new 
JobIDPathParameter();
+   private final JobIDQueryParameter jobIDQueryParameter = new 
JobIDQueryParameter();
--- End diff --

I would suggest to instead create an abstract MetricsMessageParameters 
class (containing the get thingie) and extend that for job/task/subtask.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142689891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -18,38 +18,95 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobIDQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobMetricsHeaders;
+
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
  *
  * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * If the query parameters do contain a "get" parameter a 
comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * If the query parameters do contain a "get" parameter, a 
comma-separate list of metric names is expected as a value.
+ * {@code ?get=X,Y}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
-   public static final String PARAMETER_JOB_ID = "jobid";
-   private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
+public class JobMetricsHandler extends AbstractMetricsHandler
+   implements LegacyRestHandler {
+
+   private final MetricStore metricStore;
 
public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
super(executor, fetcher);
+   metricStore = fetcher.getMetricStore();
}
 
@Override
public String[] getPaths() {
-   return new String[]{JOB_METRICS_REST_PATH};
+   return new String[]{JobMetricsHeaders.JOB_METRICS_REST_PATH};
}
 
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
-   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(JobMetricsHeaders.PARAMETER_JOB_ID));
+   return job != null ? job.metrics : null;
+   }
+
+   protected Map getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest request, 
DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+
+   synchronized (metricStore) {
+   List queryParameters = 
request.getQueryParameter(JobIDQueryParameter.class);
+   String requestedMetricsList = 
queryParameters.get(0);
+
+   return getJobMetricsOverview(jobID, 
requestedMetricsList);
+   }
+   },
+   executor);
+   }
+
+   protected JobMetricsOverview getJobMetricsOverview(JobID jobID, String 
requestedMetricsList) {
--- End diff --

same as above


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142689531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
--- End diff --

move out of legacy namespace.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142689501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+/**
+ *
+ */
+public interface JobMetricsOverview extends ResponseBody {
--- End diff --

rename to `MetricsOverview`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142688020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ *
+ */
+public class JobIDQueryParameter extends MessageQueryParameter {
--- End diff --

rename to `MetricNameParameter`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142689803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -18,38 +18,95 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobIDQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobMetricsHeaders;
+
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
  *
  * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * If the query parameters do contain a "get" parameter a 
comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * If the query parameters do contain a "get" parameter, a 
comma-separate list of metric names is expected as a value.
+ * {@code ?get=X,Y}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
-   public static final String PARAMETER_JOB_ID = "jobid";
-   private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
+public class JobMetricsHandler extends AbstractMetricsHandler
+   implements LegacyRestHandler {
+
+   private final MetricStore metricStore;
 
public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
super(executor, fetcher);
+   metricStore = fetcher.getMetricStore();
}
 
@Override
public String[] getPaths() {
-   return new String[]{JOB_METRICS_REST_PATH};
+   return new String[]{JobMetricsHeaders.JOB_METRICS_REST_PATH};
}
 
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
-   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(JobMetricsHeaders.PARAMETER_JOB_ID));
+   return job != null ? job.metrics : null;
+   }
+
+   protected Map getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
+   MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest request, 
DispatcherGateway gateway) {
--- End diff --

we can keep this logic in the AbstractMetricsHandler after having it 
implement the LegacyHandlerAdapter interface


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142688520
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMetricsHeaders.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
--- End diff --

move to `...rest.messages.metrics`.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142690660
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
--- End diff --

move out of legacy namespace.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142689455
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricIdList.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ *
+ */
+public class JobMetricIdList extends ArrayList implements 
JobMetricsOverview {
--- End diff --

I would simply add a list to the JobMetricsOverview class, making 
this class unnecessary.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r142690760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricEntryList.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ *
+ */
+public class JobMetricEntryList extends ArrayList 
implements JobMetricsOverview {
--- End diff --

same as below.


---


[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...

2017-10-01 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4757

[Flink 7694][REST][Webfrontend]Port JobMetricsHandler to new REST handler

## What is the purpose of the change

Port JobMetricsHandler to new REST handler

## Brief change log

- added `JobMetricEntry`, `JobMetricEntryList`, `JobMetricId`, 
`JobMetricIdList` for json serialization. They all implement 
`JobMetricsOverview` so the `LegacyRestHandler` can have a single type of 
`ResponseBody`
- implemented `JobMetricsHandler#handleRequests()`
- added `JobMetricsHeaders`

## Verifying this change

This change added tests and can be verified as follows:

  - Added unit tests for marshalling `JobMetricIdList` and 
`JobMetricEntryList`

## Does this pull request potentially affect one of the following parts:

  - Anything that affects deployment or recovery: JobManager (and its 
components)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7694

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4757


commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361
Author: Bowen Li 
Date:   2017-09-27T22:30:02Z

[FLINK-7724] add extra metrics to MetricStoreTest.setupStore

commit d223444eba17c620e8c45c372a7b2e6fafdfd169
Author: Bowen Li 
Date:   2017-09-27T22:39:17Z

delete unrelated files

commit 4ac192c96a1b905ecc92c6bcb369ca252b511ae5
Author: Bowen Li 
Date:   2017-09-28T04:04:30Z

Merge branch 'FLINK-7724' into FLINK-7694

commit 2815e66d511e72f11766bb516a9851339e379c5e
Author: Bowen Li 
Date:   2017-09-28T05:25:04Z

add more classes

commit c5716edb64ed7f7ff6dda7bf48b2f90e9a80cd07
Author: Bowen Li 
Date:   2017-09-28T06:09:20Z

[FLINK-7660] Support sideOutput in ProcessAllWindowFunction

commit 04a2b59bf45c62318a20d897b254fc376d58abca
Author: Bowen Li 
Date:   2017-09-28T16:25:31Z

checkstyle

commit 0e761a8e4a6c606c89e4bc5669c777a20e467183
Author: Bowen Li 
Date:   2017-09-28T06:09:20Z

[FLINK-7660] Support sideOutput in ProcessAllWindowFunction

commit 9e7c6cc3f86570938feb0ded48b204d4f320cb69
Author: Bowen Li 
Date:   2017-09-28T21:26:28Z

merge master

commit ac99b8f779f1b4275a1ee6df1527e406495c61c6
Author: Bowen Li 
Date:   2017-09-29T05:11:39Z

add more classes

commit 1402267e416916496a60c58bda7f64612d2e2ea5
Author: Bowen Li 
Date:   2017-09-29T05:34:47Z

add JobIDQueryParameter

commit bc4fc397e1fa4cc48e74d5b2157be2a8cf5f37ee
Author: Bowen Li 
Date:   2017-09-29T06:43:51Z

checkstyle

commit 31a73f3c86d6afa2f4693e9d56bc709fdfb42e19
Author: Bowen Li 
Date:   2017-09-29T07:28:51Z

add apache license to files

commit 2ad3065b237c9ecb4db67a486da48e621097470e
Author: Bowen Li 
Date:   2017-09-29T07:34:40Z

checkstyle

commit 99ec7363f7f61fa1fae201c4fd470d0fa2d6f5b2
Author: Bowen Li 
Date:   2017-09-28T06:09:20Z

[FLINK-7660] Support sideOutput in ProcessAllWindowFunction

commit 21e1185ddc999b13164317f2291ff02e7660e619
Author: Bowen Li 
Date:   2017-09-29T18:49:33Z

add JobMetricsHeaders

commit 1981e24373f8a849c2817387ce806103ebfcb3c7
Author: Bowen Li 
Date:   2017-09-29T18:51:29Z

add apache license

commit 76204dbfc0ade9fd84abbff417b388b92b73827e
Author: Bowen Li 
Date:   2017-09-29T23:09:35Z

checkstyle

commit 1d3ebfa0781ee3be1a86e024fc40d6bdd59217f5
Author: Bowen Li 
Date:   2017-09-29T23:34:01Z

fix class

commit 1d9d113cdd577bfbfabba78e887f99f8b6aa2c51
Author: Bowen Li 
Date:   2017-09-29T23:34:48Z

remove extra lines

commit 9518958a15248cf25bcc1728d02c7e16c8491d81
Author: Bowen Li 
Date:   2017-09-29T23:36:14Z

remove unused classes

commit 28ab3df8c53cc53ce9b8de95e5fd6be3308649d9
Author: Bowen Li 
Date:   2017-10-02T03:36:26Z

fix JobVertexMetricsHandlerTest

commit 0fe6cd335e98f6ad92cc543053761d72bea6c033
Author: Bowen Li 
Date:   2017-10-02T03:39:56Z

merge master

commit 71089554ccf229b8396c622eea7b55ec7252a670
Author: Bowen Li 
Date:   2017-10-02T03:59:56Z

revert unrelated changes

commit