pnowojski commented on a change in pull request #14618: URL: https://github.com/apache/flink/pull/14618#discussion_r555778071
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java ########## @@ -49,50 +54,78 @@ JobVertexBackPressureInfo, JobVertexMessageParameters> { + private final MetricFetcher metricFetcher; + public JobVertexBackPressureHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> - messageHeaders) { + messageHeaders, + MetricFetcher metricFetcher) { super(leaderRetriever, timeout, responseHeaders, messageHeaders); + this.metricFetcher = metricFetcher; } @Override protected CompletableFuture<JobVertexBackPressureInfo> handleRequest( @Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + metricFetcher.update(); + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class); - return gateway.requestOperatorBackPressureStats(jobId, jobVertexId) - .thenApply( - operatorBackPressureStats -> - operatorBackPressureStats - .getOperatorBackPressureStats() - .map( - JobVertexBackPressureHandler - ::createJobVertexBackPressureInfo) - .orElse(JobVertexBackPressureInfo.deprecated())); + + TaskMetricStore taskMetricStore = + metricFetcher + .getMetricStore() + .getTaskMetricStore(jobId.toString(), jobVertexId.toString()); + + return CompletableFuture.completedFuture( + taskMetricStore != null + ? createJobVertexBackPressureInfo( + taskMetricStore.getAllSubtaskMetricStores()) + : JobVertexBackPressureInfo.deprecated()); } - private static JobVertexBackPressureInfo createJobVertexBackPressureInfo( - final OperatorBackPressureStats operatorBackPressureStats) { + private JobVertexBackPressureInfo createJobVertexBackPressureInfo( + Collection<ComponentMetricStore> allSubtaskMetricStores) { + return new JobVertexBackPressureInfo( JobVertexBackPressureInfo.VertexBackPressureStatus.OK, - getBackPressureLevel(operatorBackPressureStats.getMaxBackPressureRatio()), - operatorBackPressureStats.getEndTimestamp(), - IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks()) - .mapToObj( - subtask -> { - final double backPressureRatio = - operatorBackPressureStats.getBackPressureRatio(subtask); - return new JobVertexBackPressureInfo.SubtaskBackPressureInfo( - subtask, - getBackPressureLevel(backPressureRatio), - backPressureRatio); - }) - .collect(Collectors.toList())); + getBackPressureLevel(getMaxBackPressureRatio(allSubtaskMetricStores)), + metricFetcher.getLastUpdateTime(), + createSubtaskBackPressureInfo(allSubtaskMetricStores)); + } + + private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo( + Collection<ComponentMetricStore> subtaskMetricStores) { + List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size()); + int subTaskIndex = 0; + for (ComponentMetricStore subtaskMetricStore : subtaskMetricStores) { + double backPressureRatio = getBackPressureRatio(subtaskMetricStore); + result.add( + new SubtaskBackPressureInfo( + subTaskIndex, + getBackPressureLevel(backPressureRatio), + backPressureRatio)); + subTaskIndex++; Review comment: Good catch, I didn't know about this `MetricStore` feature/behaviour. I don't know why I was assuming it would return empty metrics in that case. Anyway, fixed with `getAllSubtaskMetricStores` returning the a map right now. I've also extended test case to cover for that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org