zhuzhurk commented on code in PR #21695:
URL: https://github.com/apache/flink/pull/21695#discussion_r1071823620


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AbstractBlockingResultInfo.java:
##########
@@ -72,4 +73,15 @@ public void resetPartitionInfo(int partitionIndex) {
     int getNumOfRecordedPartitions() {
         return subpartitionBytesByPartitionIndex.size();
     }
+
+    /**
+     * Aggregate the input number bytes of a subtask according to the index 
range for partition and
+     * subpartition.
+     *
+     * @param partitionIndexRange Range of the index of the consumed partition.
+     * @param subpartitionIndexRange Range of the index of the consumed 
subpartition.
+     * @return Aggregated input number bytes of current result info.
+     */
+    public abstract long getExecutionVertexInputNumBytes(

Review Comment:
   It's better to put it in `BlockingResultInfo` and rename it as 
`getNumBytesProduced` to be consistent.
   From the perspective of result, bytes are produced and are not seen as 
inputs. When we say inputs, it's from the perspective of an execution vertex.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution, 
final long currentTime)
         }
     }
 
+    private long getExecutionInputNumBytes(final Execution execution) {
+        return execution.getVertex().getExecutionVertexInputNumBytes();
+    }
+
+    private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+            Execution execution, final long currentTime) {
+        long executionTime = getExecutionTime(execution, currentTime);
+        long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+        return new ExecutionTimeWithInputNumBytes(executionTime, 
executionInputNumBytes);
+    }
+
     @Override
     public void stop() {
         if (scheduledDetectionFuture != null) {
             scheduledDetectionFuture.cancel(false);
         }
     }
+
+    /** This class defines the execution time and input number bytes for an 
execution. */
+    @VisibleForTesting
+    static class ExecutionTimeWithInputNumBytes
+            implements Comparable<ExecutionTimeWithInputNumBytes> {
+
+        private final long executionTime;
+        private final long inputNumBytes;
+
+        public ExecutionTimeWithInputNumBytes(long executionTime, long 
inputNumBytes) {
+            this.executionTime = executionTime;
+            this.inputNumBytes = inputNumBytes;
+        }
+
+        public long getExecutionTime() {
+            return executionTime;
+        }
+
+        public long getInputNumBytes() {
+            return inputNumBytes;
+        }
+
+        @Override
+        public int compareTo(ExecutionTimeWithInputNumBytes other) {
+            // We should guarantee that all sorted elements' inputNumBytes are 
all zero or non-zero,
+            // otherwise it may cause ambiguity.
+            if (inputNumBytes == 0 || other.getInputNumBytes() == 0) {
+                return (int) (executionTime - other.getExecutionTime());
+            }
+            return Double.compare(
+                    executionTime,
+                    (inputNumBytes * 1.0 / other.getInputNumBytes()) * 
other.getExecutionTime());

Review Comment:
   ```suggestion
               return Double.compare(
                       (double) executionTime / inputNumBytes,
                       (double) other.getExecutionTime() / 
other.getInputNumBytes());
   ```
   This would be more clear.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution, 
final long currentTime)
         }
     }
 
+    private long getExecutionInputNumBytes(final Execution execution) {
+        return execution.getVertex().getExecutionVertexInputNumBytes();
+    }
+
+    private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+            Execution execution, final long currentTime) {
+        long executionTime = getExecutionTime(execution, currentTime);
+        long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+        return new ExecutionTimeWithInputNumBytes(executionTime, 
executionInputNumBytes);
+    }
+
     @Override
     public void stop() {
         if (scheduledDetectionFuture != null) {
             scheduledDetectionFuture.cancel(false);
         }
     }
+
+    /** This class defines the execution time and input number bytes for an 
execution. */
+    @VisibleForTesting
+    static class ExecutionTimeWithInputNumBytes

Review Comment:
   I feels the `Num` is a bit redundant and is not in the right 
place(`NumInputBytes` sounds better). So I prefer to name it as 
`ExecutionTimeWithInputBytes` to be simpler



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -168,21 +168,34 @@ private double getFinishedRatio(final ExecutionJobVertex 
executionJobVertex) {
         return (double) finishedCount / 
executionJobVertex.getTaskVertices().length;
     }
 
-    private long getBaseline(
+    private ExecutionTimeWithInputNumBytes getBaseline(
             final ExecutionJobVertex executionJobVertex, final long 
currentTimeMillis) {
-        final long executionTimeMedian =
+        final ExecutionTimeWithInputNumBytes weightedExecutionTimeMedian =
                 calculateFinishedTaskExecutionTimeMedian(executionJobVertex, 
currentTimeMillis);
-        return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian * 
baselineMultiplier);
+        long multipliedBaseline =
+                (long) (weightedExecutionTimeMedian.getExecutionTime() * 
baselineMultiplier);
+        if (multipliedBaseline < baselineLowerBoundMillis) {
+            double inputNumBytesMultiplier =
+                    baselineLowerBoundMillis * 1.0 / 
weightedExecutionTimeMedian.getExecutionTime();
+            return new ExecutionTimeWithInputNumBytes(
+                    baselineLowerBoundMillis,
+                    (long)
+                            (weightedExecutionTimeMedian.getInputNumBytes()
+                                    * inputNumBytesMultiplier));

Review Comment:
   I think the result should by divided by `baselineMultiplier`.
   Imagine this case, `weightedExecutionTimeMedian={10ms, 1000B}, 
baselineLowerBoundMillis=100, baselineMultiplier=2`. The median throughput is 
100 B/ms, and so the based line throughput should be 100/2=50 B/ms.
   If the baseline time is 100ms, the based line bytes should be 5000B, while 
100ms/10ms*1000B=10000B.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java:
##########
@@ -169,6 +169,16 @@ public ExecutionVertexInputInfo 
getExecutionVertexInputInfo(IntermediateDataSetI
                 .get(subTaskIndex);
     }
 
+    /** Get the current input number bytes. Note that only finished inputs 
will be counted. */
+    public long getExecutionVertexInputNumBytes() {
+        return getJobVertex().getInputs().stream()
+                .mapToLong(
+                        intermediateResult ->
+                                
getExecutionVertexInputInfo(intermediateResult.getId())
+                                        .getAggregatedInputNumBytes())
+                .sum();

Review Comment:
   It's better to record the total input bytes and directly return it, to avoid 
heavy invocations per-second(slow task detecting). The total input bytes can be 
computed when the execution vertex is scheduled.
   
   Then we do not need to introduce a `aggregatedInputNumBytes` to 
`ExecutionVertexInputInfo`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution, 
final long currentTime)
         }
     }
 
+    private long getExecutionInputNumBytes(final Execution execution) {
+        return execution.getVertex().getExecutionVertexInputNumBytes();
+    }
+
+    private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+            Execution execution, final long currentTime) {
+        long executionTime = getExecutionTime(execution, currentTime);
+        long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+        return new ExecutionTimeWithInputNumBytes(executionTime, 
executionInputNumBytes);
+    }
+
     @Override
     public void stop() {
         if (scheduledDetectionFuture != null) {
             scheduledDetectionFuture.cancel(false);
         }
     }
+
+    /** This class defines the execution time and input number bytes for an 
execution. */
+    @VisibleForTesting
+    static class ExecutionTimeWithInputNumBytes
+            implements Comparable<ExecutionTimeWithInputNumBytes> {
+
+        private final long executionTime;
+        private final long inputNumBytes;
+
+        public ExecutionTimeWithInputNumBytes(long executionTime, long 
inputNumBytes) {
+            this.executionTime = executionTime;
+            this.inputNumBytes = inputNumBytes;
+        }
+
+        public long getExecutionTime() {
+            return executionTime;
+        }
+
+        public long getInputNumBytes() {
+            return inputNumBytes;
+        }
+
+        @Override
+        public int compareTo(ExecutionTimeWithInputNumBytes other) {
+            // We should guarantee that all sorted elements' inputNumBytes are 
all zero or non-zero,
+            // otherwise it may cause ambiguity.

Review Comment:
   0 is a valid value of bytes number, when a task has no data to process.
   There are also cases that the bytes number is unknown, I think a -1 should 
be used in this case.
   
   When doing the comparison, 
   - if anyone's  input bytes is unknown/-1, we just compare the execution time 
(this means that we assume they have the same input bytes). We can also check 
that both input bytes should be unknown in this case
   - if anyone's input bytes is 0, we should treat it as it has the largest 
weighted execution time, unless its execution time is 0.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -327,6 +332,41 @@ private ParallelismAndInputInfos 
tryDecideParallelismAndInputInfos(
         return parallelismAndInputInfos;
     }
 
+    private List<ExecutionJobVertex> 
findExecutionJobVerticesUpStreamFinished() {
+        return 
IterableUtils.toStream(getExecutionGraph().getVerticesTopologically())
+                .filter(ExecutionJobVertex::isInitialized)
+                .filter(ejv -> !ejv.getInputs().isEmpty())
+                .filter(ejv -> tryGetConsumedResultsInfo(ejv).isPresent())
+                .collect(Collectors.toList());
+    }
+
+    private void tryEnrichInputNumBytesForExecutionVertexInputInfos() {
+        List<ExecutionJobVertex> executionJobVertices = 
findExecutionJobVerticesUpStreamFinished();
+
+        for (ExecutionJobVertex ejv : executionJobVertices) {
+            List<IntermediateResult> intermediateResults = ejv.getInputs();
+            for (IntermediateResult intermediateResult : intermediateResults) {
+                for (ExecutionVertex ev : ejv.getTaskVertices()) {
+                    ExecutionVertexInputInfo inputInfo =
+                            
ev.getExecutionVertexInputInfo(intermediateResult.getId());
+                    if (inputInfo.getAggregatedInputNumBytes() != 0) {
+                        // current intermediate result has been calculated, we 
can skip it.

Review Comment:
   The value may change if failover happened to the upstream tasks. So we 
cannot skip it.
   
   A safer way is to compute the total input bytes when an execution vertex is 
scheduled. If upstream failover happens, the execution vertex will be restarted 
and the input bytes will be re-computed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -179,6 +183,7 @@ protected void onTaskFinished(final Execution execution, 
final IOMetrics ioMetri
         checkNotNull(ioMetrics);
         updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
         initializeVerticesIfPossible();
+        tryEnrichInputNumBytesForExecutionVertexInputInfos();

Review Comment:
   This gradually builds up the input bytes for an execution vertex, even if 
the execution vertex is already running.
   However, a changing input bytes is not ideal to slow task detector at the 
moment, because a detected slow task may not be detected as slow later, if a 
new input is finished and the input bytes number is accumulated. This may 
result in some inconsistency.
   
   I'd like to avoid it, but instead compute the input bytes of execution 
vertices when before scheduling them, i.e. by overriding the 
`allocateSlotsAndDeploy` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to