pnowojski commented on a change in pull request #14618:
URL: https://github.com/apache/flink/pull/14618#discussion_r555778071



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
##########
@@ -49,50 +54,78 @@
                 JobVertexBackPressureInfo,
                 JobVertexMessageParameters> {
 
+    private final MetricFetcher metricFetcher;
+
     public JobVertexBackPressureHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, 
JobVertexMessageParameters>
-                    messageHeaders) {
+                    messageHeaders,
+            MetricFetcher metricFetcher) {
         super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+        this.metricFetcher = metricFetcher;
     }
 
     @Override
     protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(
             @Nonnull HandlerRequest<EmptyRequestBody, 
JobVertexMessageParameters> request,
             @Nonnull RestfulGateway gateway)
             throws RestHandlerException {
+        metricFetcher.update();
+
         final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
         final JobVertexID jobVertexId = 
request.getPathParameter(JobVertexIdPathParameter.class);
-        return gateway.requestOperatorBackPressureStats(jobId, jobVertexId)
-                .thenApply(
-                        operatorBackPressureStats ->
-                                operatorBackPressureStats
-                                        .getOperatorBackPressureStats()
-                                        .map(
-                                                JobVertexBackPressureHandler
-                                                        
::createJobVertexBackPressureInfo)
-                                        
.orElse(JobVertexBackPressureInfo.deprecated()));
+
+        TaskMetricStore taskMetricStore =
+                metricFetcher
+                        .getMetricStore()
+                        .getTaskMetricStore(jobId.toString(), 
jobVertexId.toString());
+
+        return CompletableFuture.completedFuture(
+                taskMetricStore != null
+                        ? createJobVertexBackPressureInfo(
+                                taskMetricStore.getAllSubtaskMetricStores())
+                        : JobVertexBackPressureInfo.deprecated());
     }
 
-    private static JobVertexBackPressureInfo createJobVertexBackPressureInfo(
-            final OperatorBackPressureStats operatorBackPressureStats) {
+    private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
+            Collection<ComponentMetricStore> allSubtaskMetricStores) {
+
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
-                
getBackPressureLevel(operatorBackPressureStats.getMaxBackPressureRatio()),
-                operatorBackPressureStats.getEndTimestamp(),
-                IntStream.range(0, 
operatorBackPressureStats.getNumberOfSubTasks())
-                        .mapToObj(
-                                subtask -> {
-                                    final double backPressureRatio =
-                                            
operatorBackPressureStats.getBackPressureRatio(subtask);
-                                    return new 
JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                                            subtask,
-                                            
getBackPressureLevel(backPressureRatio),
-                                            backPressureRatio);
-                                })
-                        .collect(Collectors.toList()));
+                
getBackPressureLevel(getMaxBackPressureRatio(allSubtaskMetricStores)),
+                metricFetcher.getLastUpdateTime(),
+                createSubtaskBackPressureInfo(allSubtaskMetricStores));
+    }
+
+    private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
+            Collection<ComponentMetricStore> subtaskMetricStores) {
+        List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
+        int subTaskIndex = 0;
+        for (ComponentMetricStore subtaskMetricStore : subtaskMetricStores) {
+            double backPressureRatio = 
getBackPressureRatio(subtaskMetricStore);
+            result.add(
+                    new SubtaskBackPressureInfo(
+                            subTaskIndex,
+                            getBackPressureLevel(backPressureRatio),
+                            backPressureRatio));
+            subTaskIndex++;

Review comment:
       Good catch, I didn't know about this `MetricStore` feature/behaviour. I 
don't know why I was assuming it would return empty metrics in that case.
   
   Anyway, fixed with `getAllSubtaskMetricStores` returning the a map right 
now. I've also extended test case to cover for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to