Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5270#discussion_r161374482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java --- @@ -65,25 +60,21 @@ public JobVertexAccumulatorsHandler( } @Override - protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { - JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); - AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); - - if (null != jobVertex) { - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length); + protected JobVertexAccumulatorsInfo handleRequest( + HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, + AccessExecutionJobVertex jobVertex) throws RestHandlerException { - for (StringifiedAccumulatorResult acc : accs) { - userAccumulatorList.add( - new JobVertexAccumulatorsInfo.UserAccumulator( - acc.getName(), - acc.getType(), - acc.getValue())); - } + StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); + ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length); - return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList); - } else { - throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND); + for (StringifiedAccumulatorResult acc : accs) { + userAccumulatorList.add( + new JobVertexAccumulatorsInfo.UserAccumulator( + acc.getName(), + acc.getType(), + acc.getValue())); } + + return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList); --- End diff -- Very nice refinement :-)
---