[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236253#comment-16236253
 ] 

ASF GitHub Bot commented on FLINK-7694:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4757#discussion_r148609556
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
    @@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.ComponentMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
    -           return job != null
    -                   ? job.metrics
    -                   : null;
    +           return job != null ? job.metrics : null;
    +   }
    +
    +   @Override
    +   public CompletableFuture<MetricsOverview> 
handleRequest(HandlerRequest<EmptyRequestBody, MetricMessageParameters> 
request, DispatcherGateway gateway) {
    +           return CompletableFuture.supplyAsync(
    +                           () -> {
    +                                   fetcher.update();
    +                                   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
    +                                   List<String> requestedMetrics = 
request.getQueryParameter(MetricNameParameter.class);
    +                                   return getMetricsOverview(jobID, 
requestedMetrics);
    +                           },
    +                           executor);
    +   }
    +
    +   protected MetricsOverview getMetricsOverview(JobID jobID, List<String> 
requestedMetrics) {
    +           Map<String, String> metricsMap = getMetricsMapByJobId(jobID, 
fetcher.getMetricStore());
    +           if (metricsMap == null) {
    +                   return new MetricsOverview();
    +           }
    +
    +           if (requestedMetrics == null || requestedMetrics.isEmpty()) {
    +                   return new MetricsOverview(
    +                                   metricsMap.entrySet().stream()
    +                                                   .map(e -> new 
MetricEntry(e.getKey(), e.getValue()))
    +                                                   
.collect(Collectors.toList()));
    +           } else {
    +                   return new MetricsOverview(
    +                                   requestedMetrics.stream()
    +                                                   .filter(e -> 
metricsMap.get(e) != null)
    +                                                   .map(e -> new 
MetricEntry(e, metricsMap.get(e)))
    +                                                   
.collect(Collectors.toList()));
    --- End diff --
    
    I think by not using Java streams we can avoid to do for every `e in 
requestedMetrics` two `HashMap` lookups and instead do it with a single lookup.


> Port JobMetricsHandler to new REST handler
> ------------------------------------------
>
>                 Key: FLINK-7694
>                 URL: https://issues.apache.org/jira/browse/FLINK-7694
>             Project: Flink
>          Issue Type: Sub-task
>          Components: REST, Webfrontend
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to