zhuzhurk commented on code in PR #21695:
URL: https://github.com/apache/flink/pull/21695#discussion_r1082900221


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java:
##########
@@ -39,6 +40,16 @@ public interface BlockingResultInfo extends 
IntermediateResultInfo {
      */
     long getNumBytesProduced();
 
+    /**
+     * Return the aggregated num of bytes according to the index range for 
partition and
+     * subpartition.
+     *
+     * @param partitionIndexRange range of the index of the consumed partition.
+     * @param subpartitionIndexRange range of the index of the consumed 
subpartition.
+     * @return aggregated input bytes according to the index ranges.

Review Comment:
   aggregated input bytes -> aggregated bytes



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -168,21 +168,34 @@ private double getFinishedRatio(final ExecutionJobVertex 
executionJobVertex) {
         return (double) finishedCount / 
executionJobVertex.getTaskVertices().length;
     }
 
-    private long getBaseline(
+    private ExecutionTimeWithInputNumBytes getBaseline(
             final ExecutionJobVertex executionJobVertex, final long 
currentTimeMillis) {
-        final long executionTimeMedian =
+        final ExecutionTimeWithInputNumBytes weightedExecutionTimeMedian =
                 calculateFinishedTaskExecutionTimeMedian(executionJobVertex, 
currentTimeMillis);
-        return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian * 
baselineMultiplier);
+        long multipliedBaseline =
+                (long) (weightedExecutionTimeMedian.getExecutionTime() * 
baselineMultiplier);
+        if (multipliedBaseline < baselineLowerBoundMillis) {
+            double inputNumBytesMultiplier =
+                    baselineLowerBoundMillis * 1.0 / 
weightedExecutionTimeMedian.getExecutionTime();
+            return new ExecutionTimeWithInputNumBytes(
+                    baselineLowerBoundMillis,
+                    (long)
+                            (weightedExecutionTimeMedian.getInputNumBytes()
+                                    * inputNumBytesMultiplier));

Review Comment:
   The change looks good to me. Could you add a test to check this case?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -263,37 +257,38 @@ public void stop() {
         }
     }
 
-    /** This class defines the execution time and input number bytes for an 
execution. */
+    /** This class defines the execution time and input bytes for an 
execution. */
     @VisibleForTesting
-    static class ExecutionTimeWithInputNumBytes
-            implements Comparable<ExecutionTimeWithInputNumBytes> {
+    static class ExecutionTimeWithInputBytes implements 
Comparable<ExecutionTimeWithInputBytes> {
 
         private final long executionTime;
-        private final long inputNumBytes;
+        private final long inputBytes;
 
-        public ExecutionTimeWithInputNumBytes(long executionTime, long 
inputNumBytes) {
+        public ExecutionTimeWithInputBytes(long executionTime, long 
inputBytes) {
             this.executionTime = executionTime;
-            this.inputNumBytes = inputNumBytes;
+            this.inputBytes = inputBytes;
         }
 
         public long getExecutionTime() {
             return executionTime;
         }
 
-        public long getInputNumBytes() {
-            return inputNumBytes;
+        public long getInputBytes() {
+            return inputBytes;
         }
 
         @Override
-        public int compareTo(ExecutionTimeWithInputNumBytes other) {
-            // We should guarantee that all sorted elements' inputNumBytes are 
all zero or non-zero,
+        public int compareTo(ExecutionTimeWithInputBytes other) {
+            // We should guarantee that all sorted elements' inputBytes are 
all UNKNOWN or assigned,
             // otherwise it may cause ambiguity.
-            if (inputNumBytes == 0 || other.getInputNumBytes() == 0) {
+            if (inputBytes == NUM_BYTES_UNKNOWN || other.getInputBytes() == 
NUM_BYTES_UNKNOWN) {

Review Comment:
   It's better to check that if one is UNKNOWN, the other is also UNKNOWN. 
Otherwise an exception should be thrown to expose this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to