Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4757#discussion_r143706896
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
    @@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
    -           return job != null
    -                   ? job.metrics
    -                   : null;
    +           return job != null ? job.metrics : null;
    +   }
    +
    +   protected Map<String, String> getMetricsMapByJobId(JobID jobID, 
MetricStore metrics) {
    +           MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(jobID);
    +           return job != null ? job.metrics : null;
    +   }
    +
    +   @Override
    +   public CompletableFuture<JobMetricsOverview> 
handleRequest(HandlerRequest<EmptyRequestBody, JobMetricsMessageParameters> 
request, DispatcherGateway gateway) {
    +           return CompletableFuture.supplyAsync(
    +                   () -> {
    +                           JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
    +
    +                           synchronized (metricStore) {
    +                                   List<String> queryParameters = 
request.getQueryParameter(JobIDQueryParameter.class);
    +                                   String requestedMetricsList = 
queryParameters.get(0);
    --- End diff --
    
    why are we doing the parsing of the query parameter inside of the 
synchronized block?


---

Reply via email to