zhuzhurk commented on code in PR #21695:
URL: https://github.com/apache/flink/pull/21695#discussion_r1071823620
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AbstractBlockingResultInfo.java:
##########
@@ -72,4 +73,15 @@ public void resetPartitionInfo(int partitionIndex) {
int getNumOfRecordedPartitions() {
return subpartitionBytesByPartitionIndex.size();
}
+
+ /**
+ * Aggregate the input number bytes of a subtask according to the index
range for partition and
+ * subpartition.
+ *
+ * @param partitionIndexRange Range of the index of the consumed partition.
+ * @param subpartitionIndexRange Range of the index of the consumed
subpartition.
+ * @return Aggregated input number bytes of current result info.
+ */
+ public abstract long getExecutionVertexInputNumBytes(
Review Comment:
It's better to put it in `BlockingResultInfo` and rename it as
`getNumBytesProduced` to be consistent.
From the perspective of result, bytes are produced and are not seen as
inputs. When we say inputs, it's from the perspective of an execution vertex.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution,
final long currentTime)
}
}
+ private long getExecutionInputNumBytes(final Execution execution) {
+ return execution.getVertex().getExecutionVertexInputNumBytes();
+ }
+
+ private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+ Execution execution, final long currentTime) {
+ long executionTime = getExecutionTime(execution, currentTime);
+ long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+ return new ExecutionTimeWithInputNumBytes(executionTime,
executionInputNumBytes);
+ }
+
@Override
public void stop() {
if (scheduledDetectionFuture != null) {
scheduledDetectionFuture.cancel(false);
}
}
+
+ /** This class defines the execution time and input number bytes for an
execution. */
+ @VisibleForTesting
+ static class ExecutionTimeWithInputNumBytes
+ implements Comparable<ExecutionTimeWithInputNumBytes> {
+
+ private final long executionTime;
+ private final long inputNumBytes;
+
+ public ExecutionTimeWithInputNumBytes(long executionTime, long
inputNumBytes) {
+ this.executionTime = executionTime;
+ this.inputNumBytes = inputNumBytes;
+ }
+
+ public long getExecutionTime() {
+ return executionTime;
+ }
+
+ public long getInputNumBytes() {
+ return inputNumBytes;
+ }
+
+ @Override
+ public int compareTo(ExecutionTimeWithInputNumBytes other) {
+ // We should guarantee that all sorted elements' inputNumBytes are
all zero or non-zero,
+ // otherwise it may cause ambiguity.
+ if (inputNumBytes == 0 || other.getInputNumBytes() == 0) {
+ return (int) (executionTime - other.getExecutionTime());
+ }
+ return Double.compare(
+ executionTime,
+ (inputNumBytes * 1.0 / other.getInputNumBytes()) *
other.getExecutionTime());
Review Comment:
```suggestion
return Double.compare(
(double) executionTime / inputNumBytes,
(double) other.getExecutionTime() /
other.getInputNumBytes());
```
This would be more clear.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution,
final long currentTime)
}
}
+ private long getExecutionInputNumBytes(final Execution execution) {
+ return execution.getVertex().getExecutionVertexInputNumBytes();
+ }
+
+ private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+ Execution execution, final long currentTime) {
+ long executionTime = getExecutionTime(execution, currentTime);
+ long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+ return new ExecutionTimeWithInputNumBytes(executionTime,
executionInputNumBytes);
+ }
+
@Override
public void stop() {
if (scheduledDetectionFuture != null) {
scheduledDetectionFuture.cancel(false);
}
}
+
+ /** This class defines the execution time and input number bytes for an
execution. */
+ @VisibleForTesting
+ static class ExecutionTimeWithInputNumBytes
Review Comment:
I feels the `Num` is a bit redundant and is not in the right
place(`NumInputBytes` sounds better). So I prefer to name it as
`ExecutionTimeWithInputBytes` to be simpler
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -168,21 +168,34 @@ private double getFinishedRatio(final ExecutionJobVertex
executionJobVertex) {
return (double) finishedCount /
executionJobVertex.getTaskVertices().length;
}
- private long getBaseline(
+ private ExecutionTimeWithInputNumBytes getBaseline(
final ExecutionJobVertex executionJobVertex, final long
currentTimeMillis) {
- final long executionTimeMedian =
+ final ExecutionTimeWithInputNumBytes weightedExecutionTimeMedian =
calculateFinishedTaskExecutionTimeMedian(executionJobVertex,
currentTimeMillis);
- return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian *
baselineMultiplier);
+ long multipliedBaseline =
+ (long) (weightedExecutionTimeMedian.getExecutionTime() *
baselineMultiplier);
+ if (multipliedBaseline < baselineLowerBoundMillis) {
+ double inputNumBytesMultiplier =
+ baselineLowerBoundMillis * 1.0 /
weightedExecutionTimeMedian.getExecutionTime();
+ return new ExecutionTimeWithInputNumBytes(
+ baselineLowerBoundMillis,
+ (long)
+ (weightedExecutionTimeMedian.getInputNumBytes()
+ * inputNumBytesMultiplier));
Review Comment:
I think the result should by divided by `baselineMultiplier`.
Imagine this case, `weightedExecutionTimeMedian={10ms, 1000B},
baselineLowerBoundMillis=100, baselineMultiplier=2`. The median throughput is
100 B/ms, and so the based line throughput should be 100/2=50 B/ms.
If the baseline time is 100ms, the based line bytes should be 5000B, while
100ms/10ms*1000B=10000B.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java:
##########
@@ -169,6 +169,16 @@ public ExecutionVertexInputInfo
getExecutionVertexInputInfo(IntermediateDataSetI
.get(subTaskIndex);
}
+ /** Get the current input number bytes. Note that only finished inputs
will be counted. */
+ public long getExecutionVertexInputNumBytes() {
+ return getJobVertex().getInputs().stream()
+ .mapToLong(
+ intermediateResult ->
+
getExecutionVertexInputInfo(intermediateResult.getId())
+ .getAggregatedInputNumBytes())
+ .sum();
Review Comment:
It's better to record the total input bytes and directly return it, to avoid
heavy invocations per-second(slow task detecting). The total input bytes can be
computed when the execution vertex is scheduled.
Then we do not need to introduce a `aggregatedInputNumBytes` to
`ExecutionVertexInputInfo`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java:
##########
@@ -225,10 +244,56 @@ private long getExecutionTime(final Execution execution,
final long currentTime)
}
}
+ private long getExecutionInputNumBytes(final Execution execution) {
+ return execution.getVertex().getExecutionVertexInputNumBytes();
+ }
+
+ private ExecutionTimeWithInputNumBytes getExecutionTimeAndInputNumBytes(
+ Execution execution, final long currentTime) {
+ long executionTime = getExecutionTime(execution, currentTime);
+ long executionInputNumBytes = getExecutionInputNumBytes(execution);
+
+ return new ExecutionTimeWithInputNumBytes(executionTime,
executionInputNumBytes);
+ }
+
@Override
public void stop() {
if (scheduledDetectionFuture != null) {
scheduledDetectionFuture.cancel(false);
}
}
+
+ /** This class defines the execution time and input number bytes for an
execution. */
+ @VisibleForTesting
+ static class ExecutionTimeWithInputNumBytes
+ implements Comparable<ExecutionTimeWithInputNumBytes> {
+
+ private final long executionTime;
+ private final long inputNumBytes;
+
+ public ExecutionTimeWithInputNumBytes(long executionTime, long
inputNumBytes) {
+ this.executionTime = executionTime;
+ this.inputNumBytes = inputNumBytes;
+ }
+
+ public long getExecutionTime() {
+ return executionTime;
+ }
+
+ public long getInputNumBytes() {
+ return inputNumBytes;
+ }
+
+ @Override
+ public int compareTo(ExecutionTimeWithInputNumBytes other) {
+ // We should guarantee that all sorted elements' inputNumBytes are
all zero or non-zero,
+ // otherwise it may cause ambiguity.
Review Comment:
0 is a valid value of bytes number, when a task has no data to process.
There are also cases that the bytes number is unknown, I think a -1 should
be used in this case.
When doing the comparison,
- if anyone's input bytes is unknown/-1, we just compare the execution time
(this means that we assume they have the same input bytes). We can also check
that both input bytes should be unknown in this case
- if anyone's input bytes is 0, we should treat it as it has the largest
weighted execution time, unless its execution time is 0.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -327,6 +332,41 @@ private ParallelismAndInputInfos
tryDecideParallelismAndInputInfos(
return parallelismAndInputInfos;
}
+ private List<ExecutionJobVertex>
findExecutionJobVerticesUpStreamFinished() {
+ return
IterableUtils.toStream(getExecutionGraph().getVerticesTopologically())
+ .filter(ExecutionJobVertex::isInitialized)
+ .filter(ejv -> !ejv.getInputs().isEmpty())
+ .filter(ejv -> tryGetConsumedResultsInfo(ejv).isPresent())
+ .collect(Collectors.toList());
+ }
+
+ private void tryEnrichInputNumBytesForExecutionVertexInputInfos() {
+ List<ExecutionJobVertex> executionJobVertices =
findExecutionJobVerticesUpStreamFinished();
+
+ for (ExecutionJobVertex ejv : executionJobVertices) {
+ List<IntermediateResult> intermediateResults = ejv.getInputs();
+ for (IntermediateResult intermediateResult : intermediateResults) {
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ ExecutionVertexInputInfo inputInfo =
+
ev.getExecutionVertexInputInfo(intermediateResult.getId());
+ if (inputInfo.getAggregatedInputNumBytes() != 0) {
+ // current intermediate result has been calculated, we
can skip it.
Review Comment:
The value may change if failover happened to the upstream tasks. So we
cannot skip it.
A safer way is to compute the total input bytes when an execution vertex is
scheduled. If upstream failover happens, the execution vertex will be restarted
and the input bytes will be re-computed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -179,6 +183,7 @@ protected void onTaskFinished(final Execution execution,
final IOMetrics ioMetri
checkNotNull(ioMetrics);
updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
initializeVerticesIfPossible();
+ tryEnrichInputNumBytesForExecutionVertexInputInfos();
Review Comment:
This gradually builds up the input bytes for an execution vertex, even if
the execution vertex is already running.
However, a changing input bytes is not ideal to slow task detector at the
moment, because a detected slow task may not be detected as slow later, if a
new input is finished and the input bytes number is accumulated. This may
result in some inconsistency.
I'd like to avoid it, but instead compute the input bytes of execution
vertices when before scheduling them, i.e. by overriding the
`allocateSlotsAndDeploy` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]