Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r143706896 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +70,45 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + protected Map<String, String> getMetricsMapByJobId(JobID jobID, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(jobID); + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture<JobMetricsOverview> handleRequest(HandlerRequest<EmptyRequestBody, JobMetricsMessageParameters> request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + + synchronized (metricStore) { + List<String> queryParameters = request.getQueryParameter(JobIDQueryParameter.class); + String requestedMetricsList = queryParameters.get(0); --- End diff -- why are we doing the parsing of the query parameter inside of the synchronized block?
---