Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5270#discussion_r161374482
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
 ---
    @@ -65,25 +60,21 @@ public JobVertexAccumulatorsHandler(
        }
     
        @Override
    -   protected JobVertexAccumulatorsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> 
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
    -           JobVertexID jobVertexID = 
request.getPathParameter(JobVertexIdPathParameter.class);
    -           AccessExecutionJobVertex jobVertex = 
executionGraph.getJobVertex(jobVertexID);
    -
    -           if (null != jobVertex) {
    -                   StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
    -                   ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> 
userAccumulatorList = new ArrayList<>(accs.length);
    +   protected JobVertexAccumulatorsInfo handleRequest(
    +                   HandlerRequest<EmptyRequestBody, 
JobVertexMessageParameters> request,
    +                   AccessExecutionJobVertex jobVertex) throws 
RestHandlerException {
     
    -                   for (StringifiedAccumulatorResult acc : accs) {
    -                           userAccumulatorList.add(
    -                                   new 
JobVertexAccumulatorsInfo.UserAccumulator(
    -                                           acc.getName(),
    -                                           acc.getType(),
    -                                           acc.getValue()));
    -                   }
    +           StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
    +           ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> 
userAccumulatorList = new ArrayList<>(accs.length);
     
    -                   return new 
JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), 
userAccumulatorList);
    -           } else {
    -                   throw new RestHandlerException("There is no accumulator 
for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND);
    +           for (StringifiedAccumulatorResult acc : accs) {
    +                   userAccumulatorList.add(
    +                           new JobVertexAccumulatorsInfo.UserAccumulator(
    +                                   acc.getName(),
    +                                   acc.getType(),
    +                                   acc.getValue()));
                }
    +
    +           return new 
JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), 
userAccumulatorList);
    --- End diff --
    
    Very nice refinement :-)


---

Reply via email to