zhuzhurk commented on code in PR #21695: URL: https://github.com/apache/flink/pull/21695#discussion_r1082900221
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java: ########## @@ -39,6 +40,16 @@ public interface BlockingResultInfo extends IntermediateResultInfo { */ long getNumBytesProduced(); + /** + * Return the aggregated num of bytes according to the index range for partition and + * subpartition. + * + * @param partitionIndexRange range of the index of the consumed partition. + * @param subpartitionIndexRange range of the index of the consumed subpartition. + * @return aggregated input bytes according to the index ranges. Review Comment: aggregated input bytes -> aggregated bytes ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java: ########## @@ -168,21 +168,34 @@ private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) { return (double) finishedCount / executionJobVertex.getTaskVertices().length; } - private long getBaseline( + private ExecutionTimeWithInputNumBytes getBaseline( final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) { - final long executionTimeMedian = + final ExecutionTimeWithInputNumBytes weightedExecutionTimeMedian = calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis); - return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian * baselineMultiplier); + long multipliedBaseline = + (long) (weightedExecutionTimeMedian.getExecutionTime() * baselineMultiplier); + if (multipliedBaseline < baselineLowerBoundMillis) { + double inputNumBytesMultiplier = + baselineLowerBoundMillis * 1.0 / weightedExecutionTimeMedian.getExecutionTime(); + return new ExecutionTimeWithInputNumBytes( + baselineLowerBoundMillis, + (long) + (weightedExecutionTimeMedian.getInputNumBytes() + * inputNumBytesMultiplier)); Review Comment: The change looks good to me. Could you add a test to check this case? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java: ########## @@ -263,37 +257,38 @@ public void stop() { } } - /** This class defines the execution time and input number bytes for an execution. */ + /** This class defines the execution time and input bytes for an execution. */ @VisibleForTesting - static class ExecutionTimeWithInputNumBytes - implements Comparable<ExecutionTimeWithInputNumBytes> { + static class ExecutionTimeWithInputBytes implements Comparable<ExecutionTimeWithInputBytes> { private final long executionTime; - private final long inputNumBytes; + private final long inputBytes; - public ExecutionTimeWithInputNumBytes(long executionTime, long inputNumBytes) { + public ExecutionTimeWithInputBytes(long executionTime, long inputBytes) { this.executionTime = executionTime; - this.inputNumBytes = inputNumBytes; + this.inputBytes = inputBytes; } public long getExecutionTime() { return executionTime; } - public long getInputNumBytes() { - return inputNumBytes; + public long getInputBytes() { + return inputBytes; } @Override - public int compareTo(ExecutionTimeWithInputNumBytes other) { - // We should guarantee that all sorted elements' inputNumBytes are all zero or non-zero, + public int compareTo(ExecutionTimeWithInputBytes other) { + // We should guarantee that all sorted elements' inputBytes are all UNKNOWN or assigned, // otherwise it may cause ambiguity. - if (inputNumBytes == 0 || other.getInputNumBytes() == 0) { + if (inputBytes == NUM_BYTES_UNKNOWN || other.getInputBytes() == NUM_BYTES_UNKNOWN) { Review Comment: It's better to check that if one is UNKNOWN, the other is also UNKNOWN. Otherwise an exception should be thrown to expose this bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org