zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r343987061
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinator.java
 ##########
 @@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting back pressure stats
+ * of running tasks.
+ */
+public class BackPressureRequestCoordinator {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureRequestCoordinator.class);
+
+       private static final int NUM_GHOST_REQUEST_IDS = 10;
+
+       private final Object lock = new Object();
+
+       /** Executor used to run the futures. */
+       private final Executor executor;
+
+       /** Request time out of a triggered back pressure request. */
+       private final Time requestTimeout;
+
+       /** In progress back pressure requests. */
+       @GuardedBy("lock")
+       private final Map<Integer, PendingBackPressureRequest> pendingRequests 
= new HashMap<>();
+
+       /** A list of recent request IDs to identify late messages vs. invalid 
ones. */
+       private final ArrayDeque<Integer> recentPendingRequests = new 
ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
+
+       /** Request ID counter. */
+       @GuardedBy("lock")
+       private int requestIdCounter;
+
+       /** Flag indicating whether the coordinator is still running. */
+       @GuardedBy("lock")
+       private boolean isShutDown;
+
+       /**
+        * Creates a new coordinator for the cluster.
+        *
+        * @param executor Used to execute the futures.
+        * @param requestTimeout Request time out of a triggered back pressure 
request.
+        */
+       public BackPressureRequestCoordinator(
+                       Executor executor,
+                       long requestTimeout) {
+
+               checkArgument(requestTimeout >= 0L, "Illegal request timeout: " 
+ requestTimeout);
+
+               this.executor = checkNotNull(executor);
+               this.requestTimeout = Time.milliseconds(requestTimeout);
+       }
+
+       /**
+        * Triggers a task back pressure stats request to all tasks.
+        *
+        * @param tasks Tasks to request.
+        * @return A future of the completed task back pressure stats.
+        */
+       CompletableFuture<BackPressureStats> 
triggerBackPressureRequest(ExecutionVertex[] tasks) {
+
+               checkNotNull(tasks, "Tasks to request must not be null.");
+               checkArgument(tasks.length >= 1, "No tasks to request.");
+
+               // Execution IDs of running tasks
+               ExecutionAttemptID[] triggerIds = new 
ExecutionAttemptID[tasks.length];
+               Execution[] executions = new Execution[tasks.length];
+
+               // Check that all tasks are RUNNING before triggering anything. 
The
+               // triggering can still fail.
+               for (int i = 0; i < triggerIds.length; i++) {
+                       Execution execution = 
tasks[i].getCurrentExecutionAttempt();
+                       if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                               executions[i] = execution;
+                               triggerIds[i] = execution.getAttemptId();
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Task " + tasks[i]
+                                       .getTaskNameWithSubtaskIndex() + " is 
not running."));
+                       }
+               }
+
+               synchronized (lock) {
+                       if (isShutDown) {
+                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
+                       }
+
+                       final int requestId = requestIdCounter++;
+
+                       LOG.debug("Triggering task back pressure request {}", 
requestId);
+
+                       final PendingBackPressureRequest pending = new 
PendingBackPressureRequest(requestId, triggerIds);
+
+                       // Add the pending request before scheduling the 
discard task to
+                       // prevent races with removing it again.
+                       pendingRequests.put(requestId, pending);
+
+                       // Trigger all requests. The request will be discarded 
if it takes
 
 Review comment:
   This method is a bit long, and it is better to further extract a separate 
method from it as below: 
   ```
        /**
         * Requests back pressure stats from all the given executions. The 
response would
         * be ignored if it does not return within timeout.
         */
        private void requestBackPressure(Execution[] executions, int requestId) 
{
                for (Execution execution: executions) {
                        CompletableFuture<TaskBackPressureResponse> 
taskBackPressureFuture =
                                execution.requestBackPressure(requestId, 
requestTimeout);
   
                        taskBackPressureFuture.handleAsync(
                                (TaskBackPressureResponse 
taskBackPressureResponse, Throwable throwable) -> {
                                        if (taskBackPressureResponse != null) {
                                                collectTaskBackPressureStat(
                                                        
taskBackPressureResponse.getRequestId(),
                                                        
taskBackPressureResponse.getExecutionAttemptID(),
                                                        
taskBackPressureResponse.getBackPressureRatio());
                                        } else {
                                                
cancelBackPressureRequest(requestId, throwable);
                                        }
                                        return null;
                                },
                                executor);
                }
        }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to