This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc2a86e5bb5f048fb5e5007f916405773a88b5cc Author: Yangze Guo <karma...@gmail.com> AuthorDate: Mon May 23 16:50:37 2022 +0800 [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured This closes #20110. --- .../shortcodes/generated/rest_v1_dispatcher.html | 45 ++++++++++++++++++++++ docs/static/generated/rest_v1_dispatcher.yml | 9 +++++ .../src/test/resources/rest_api_v1.snapshot | 45 ++++++++++++++++++++++ .../flink/runtime/executiongraph/IOMetrics.java | 37 +++++++++++++++++- .../apache/flink/runtime/metrics/MetricNames.java | 3 ++ .../apache/flink/runtime/metrics/TimerGauge.java | 9 +++++ .../runtime/metrics/groups/TaskIOMetricGroup.java | 39 ++++++++++++++++++- .../rest/handler/job/JobDetailsHandler.java | 5 ++- .../handler/job/JobVertexTaskManagersHandler.java | 5 ++- .../rest/handler/util/MutableIOMetrics.java | 32 ++++++++++++++- .../job/SubtaskExecutionAttemptDetailsInfo.java | 5 ++- .../rest/messages/job/metrics/IOMetricsInfo.java | 45 ++++++++++++++++++++-- .../DefaultExecutionGraphDeploymentTest.java | 6 +-- .../ExecutionPartitionLifecycleTest.java | 4 +- .../flink/runtime/metrics/TimerGaugeTest.java | 4 ++ .../metrics/groups/TaskIOMetricGroupTest.java | 15 ++++++++ .../SubtaskCurrentAttemptDetailsHandlerTest.java | 28 ++++++++++++-- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 28 ++++++++++++-- .../rest/messages/JobVertexDetailsInfoTest.java | 5 ++- .../messages/JobVertexTaskManagersInfoTest.java | 5 ++- .../rest/messages/job/JobDetailsInfoTest.java | 5 ++- .../SubtaskExecutionAttemptDetailsInfoTest.java | 5 ++- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 2 +- .../flink/streaming/runtime/tasks/StreamTask.java | 1 + 24 files changed, 361 insertions(+), 26 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 7638b3ee8da..4b648e77e72 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -1497,6 +1497,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo", "properties" : { + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, "read-bytes" : { "type" : "integer" }, @@ -3658,6 +3667,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo", "properties" : { + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, "read-bytes" : { "type" : "integer" }, @@ -4343,6 +4361,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo", "properties" : { + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, "read-bytes" : { "type" : "integer" }, @@ -4471,6 +4498,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo", "properties" : { + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, "read-bytes" : { "type" : "integer" }, @@ -4883,6 +4919,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo", "properties" : { + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, "read-bytes" : { "type" : "integer" }, diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 859783719ae..259d5b6d9ba 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -1628,6 +1628,15 @@ components: format: int64 write-records-complete: type: boolean + accumulated-backpressured-time: + type: integer + format: int64 + accumulated-idle-time: + type: integer + format: int64 + accumulated-busy-time: + type: number + format: double SavepointFormatType: type: string enum: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 537e966deff..99cd6fb0cab 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -829,6 +829,15 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" } } } @@ -2167,6 +2176,15 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" } } }, @@ -2528,6 +2546,15 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" } } }, @@ -2614,6 +2641,15 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" } } }, @@ -2843,6 +2879,15 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "accumulated-backpressured-time" : { + "type" : "integer" + }, + "accumulated-idle-time" : { + "type" : "integer" + }, + "accumulated-busy-time" : { + "type" : "number" } } }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index b7bbf86fb3c..e612837531a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -37,6 +38,10 @@ public class IOMetrics implements Serializable { protected long numBytesIn; protected long numBytesOut; + protected long accumulateBackPressuredTime; + protected double accumulateBusyTime; + protected long accumulateIdleTime; + protected final Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions = new HashMap<>(); @@ -45,11 +50,17 @@ public class IOMetrics implements Serializable { Meter recordsOut, Meter bytesIn, Meter bytesOut, - Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters) { + Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters, + Gauge<Long> accumulatedBackPressuredTime, + Gauge<Long> accumulatedIdleTime, + Gauge<Double> accumulatedBusyTime) { this.numRecordsIn = recordsIn.getCount(); this.numRecordsOut = recordsOut.getCount(); this.numBytesIn = bytesIn.getCount(); this.numBytesOut = bytesOut.getCount(); + this.accumulateBackPressuredTime = accumulatedBackPressuredTime.getValue(); + this.accumulateBusyTime = accumulatedBusyTime.getValue(); + this.accumulateIdleTime = accumulatedIdleTime.getValue(); for (Map.Entry<IntermediateResultPartitionID, Counter> counter : numBytesProducedCounters.entrySet()) { @@ -57,11 +68,21 @@ public class IOMetrics implements Serializable { } } - public IOMetrics(long numBytesIn, long numBytesOut, long numRecordsIn, long numRecordsOut) { + public IOMetrics( + long numBytesIn, + long numBytesOut, + long numRecordsIn, + long numRecordsOut, + long accumulateIdleTime, + long accumulateBusyTime, + long accumulateBackPressuredTime) { this.numBytesIn = numBytesIn; this.numBytesOut = numBytesOut; this.numRecordsIn = numRecordsIn; this.numRecordsOut = numRecordsOut; + this.accumulateIdleTime = accumulateIdleTime; + this.accumulateBusyTime = accumulateBusyTime; + this.accumulateBackPressuredTime = accumulateBackPressuredTime; } public long getNumRecordsIn() { @@ -80,6 +101,18 @@ public class IOMetrics implements Serializable { return numBytesOut; } + public double getAccumulateBusyTime() { + return accumulateBusyTime; + } + + public long getAccumulateBackPressuredTime() { + return accumulateBackPressuredTime; + } + + public long getAccumulateIdleTime() { + return accumulateIdleTime; + } + public Map<IntermediateResultPartitionID, Long> getNumBytesProducedOfPartitions() { return numBytesProducedOfPartitions; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 762c6671791..1f083520f01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -72,6 +72,9 @@ public class MetricNames { public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE; public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE; public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE; + public static final String ACC_TASK_IDLE_TIME = "accumulateIdleTimeMs"; + public static final String ACC_TASK_BUSY_TIME = "accumulateBusyTimeMs"; + public static final String ACC_TASK_BACK_PRESSURED_TIME = "accumulateBackPressuredTimeMs"; public static final String TASK_SOFT_BACK_PRESSURED_TIME = "softBackPressuredTimeMs" + SUFFIX_RATE; public static final String TASK_HARD_BACK_PRESSURED_TIME = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java index d7f81d06c6a..21819c3d301 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java @@ -47,6 +47,8 @@ public class TimerGauge implements Gauge<Long>, View { private long previousMaxSingleMeasurement; private long currentMaxSingleMeasurement; + private long accumulatedCount; + public TimerGauge() { this(SystemClock.getInstance()); } @@ -66,6 +68,7 @@ public class TimerGauge implements Gauge<Long>, View { if (currentMeasurementStartTS != 0) { long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS; currentCount += currentMeasurement; + accumulatedCount += currentMeasurement; currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement); currentUpdateTS = 0; currentMeasurementStartTS = 0; @@ -79,6 +82,7 @@ public class TimerGauge implements Gauge<Long>, View { // we adding to the current count only the time elapsed since last markStart or update // call currentCount += now - currentUpdateTS; + accumulatedCount += now - currentUpdateTS; currentUpdateTS = now; // on the other hand, max measurement has to be always checked against last markStart // call @@ -104,6 +108,11 @@ public class TimerGauge implements Gauge<Long>, View { return previousMaxSingleMeasurement; } + /** @return the accumulated period by the given * TimerGauge. */ + public synchronized long getAccumulatedCount() { + return accumulatedCount; + } + @VisibleForTesting public synchronized long getCount() { return currentCount; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 0b580d9453e..17dca7b97b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -60,12 +60,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final TimerGauge hardBackPressuredTimePerSecond; private final Gauge<Long> maxSoftBackPressuredTime; private final Gauge<Long> maxHardBackPressuredTime; + private final Gauge<Long> accumulatedBackPressuredTime; + private final Gauge<Long> accumulatedIdleTime; + private final Gauge<Double> accumulatedBusyTime; private final Meter mailboxThroughput; private final Histogram mailboxLatency; private final SizeGauge mailboxSize; private volatile boolean busyTimeEnabled; + private long taskStartTime; + private final Map<IntermediateResultPartitionID, Counter> numBytesProducedOfPartitions = new HashMap<>(); @@ -107,6 +112,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond); + this.accumulatedBusyTime = + gauge(MetricNames.ACC_TASK_BUSY_TIME, this::getAccumulatedBusyTime); + this.accumulatedBackPressuredTime = + gauge( + MetricNames.ACC_TASK_BACK_PRESSURED_TIME, + this::getAccumulatedBackPressuredTimeMs); + this.accumulatedIdleTime = + gauge(MetricNames.ACC_TASK_IDLE_TIME, idleTimePerSecond::getAccumulatedCount); + this.numMailsProcessed = new SimpleCounter(); this.mailboxThroughput = meter(MetricNames.MAILBOX_THROUGHPUT, new MeterView(numMailsProcessed)); @@ -121,7 +135,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { numRecordsOutRate, numBytesInRate, numBytesOutRate, - numBytesProducedOfPartitions); + numBytesProducedOfPartitions, + accumulatedBackPressuredTime, + accumulatedIdleTime, + accumulatedBusyTime); } // ============================================================================================ @@ -169,6 +186,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { + getHardBackPressuredTimePerSecond().getValue(); } + public long getAccumulatedBackPressuredTimeMs() { + return getSoftBackPressuredTimePerSecond().getAccumulatedCount() + + getHardBackPressuredTimePerSecond().getAccumulatedCount(); + } + + public void markTaskStart() { + this.taskStartTime = System.currentTimeMillis(); + } + public void setEnableBusyTime(boolean enabled) { busyTimeEnabled = enabled; } @@ -178,6 +204,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN; } + private double getAccumulatedBusyTime() { + return busyTimeEnabled + ? Math.max( + System.currentTimeMillis() + - taskStartTime + - idleTimePerSecond.getAccumulatedCount() + - getAccumulatedBackPressuredTimeMs(), + 0) + : Double.NaN; + } + public Meter getMailboxThroughput() { return mailboxThroughput; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 68b16fd8652..4f850bc0712 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -218,7 +218,10 @@ public class JobDetailsHandler counts.getNumRecordsIn(), counts.isNumRecordsInComplete(), counts.getNumRecordsOut(), - counts.isNumRecordsOutComplete()); + counts.isNumRecordsOutComplete(), + counts.getAccumulateBackPressuredTime(), + counts.getAccumulateIdleTime(), + counts.getAccumulateBusyTime()); return new JobDetailsInfo.JobVertexDetailsInfo( ejv.getJobVertexId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index d1ed4030443..02df59952a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -206,7 +206,10 @@ public class JobVertexTaskManagersHandler counts.getNumRecordsIn(), counts.isNumRecordsInComplete(), counts.getNumRecordsOut(), - counts.isNumRecordsOutComplete()); + counts.isNumRecordsOutComplete(), + counts.getAccumulateBackPressuredTime(), + counts.getAccumulateIdleTime(), + counts.getAccumulateBusyTime()); Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index b5dac6ce5ad..7da9061a090 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -47,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics { private boolean numRecordsOutComplete = true; public MutableIOMetrics() { - super(0, 0, 0, 0); + super(0, 0, 0, 0, 0, 0, 0); } public boolean isNumBytesInComplete() { @@ -86,6 +86,13 @@ public class MutableIOMetrics extends IOMetrics { this.numBytesOut += ioMetrics.getNumBytesOut(); this.numRecordsIn += ioMetrics.getNumRecordsIn(); this.numRecordsOut += ioMetrics.getNumRecordsOut(); + this.accumulateBackPressuredTime += ioMetrics.getAccumulateBackPressuredTime(); + this.accumulateIdleTime += ioMetrics.getAccumulateIdleTime(); + if (Double.isNaN(ioMetrics.getAccumulateBusyTime())) { + this.accumulateBusyTime = Double.NaN; + } else { + this.accumulateBusyTime += ioMetrics.getAccumulateBusyTime(); + } } } else { // execAttempt is still running, use MetricQueryService instead if (fetcher != null) { @@ -127,6 +134,29 @@ public class MutableIOMetrics extends IOMetrics { this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); } + + if (metrics.getMetric(MetricNames.ACC_TASK_BACK_PRESSURED_TIME) != null) { + this.accumulateBackPressuredTime += + Long.parseLong( + metrics.getMetric( + MetricNames.ACC_TASK_BACK_PRESSURED_TIME)); + } + + if (metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME) != null) { + this.accumulateIdleTime += + Long.parseLong(metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME)); + } + + if (metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME) != null) { + double busyTime = + Double.parseDouble( + metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME)); + if (Double.isNaN(busyTime)) { + this.accumulateBusyTime = Double.NaN; + } else { + this.accumulateBusyTime += busyTime; + } + } } else { this.numBytesInComplete = false; this.numBytesOutComplete = false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java index 42718d0070d..637538f1f48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java @@ -226,7 +226,10 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { ioMetrics.getNumRecordsIn(), ioMetrics.isNumRecordsInComplete(), ioMetrics.getNumRecordsOut(), - ioMetrics.isNumRecordsOutComplete()); + ioMetrics.isNumRecordsOutComplete(), + ioMetrics.getAccumulateBackPressuredTime(), + ioMetrics.getAccumulateIdleTime(), + ioMetrics.getAccumulateBusyTime()); return new SubtaskExecutionAttemptDetailsInfo( execution.getParallelSubtaskIndex(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java index 29e3e62c8b7..02c37152fe8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java @@ -42,6 +42,12 @@ public final class IOMetricsInfo { private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete"; + private static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time"; + + private static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time"; + + private static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time"; + @JsonProperty(FIELD_NAME_BYTES_READ) private final long bytesRead; @@ -66,6 +72,15 @@ public final class IOMetricsInfo { @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) private final boolean recordsWrittenComplete; + @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) + private final long accumulatedBackpressured; + + @JsonProperty(FIELD_NAME_ACC_IDLE) + private final long accumulatedIdle; + + @JsonProperty(FIELD_NAME_ACC_BUSY) + private final double accumulatedBusy; + @JsonCreator public IOMetricsInfo( @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead, @@ -75,7 +90,10 @@ public final class IOMetricsInfo { @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead, @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete, @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten, - @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) { + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete, + @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) long accumulatedBackpressured, + @JsonProperty(FIELD_NAME_ACC_IDLE) long accumulatedIdle, + @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy) { this.bytesRead = bytesRead; this.bytesReadComplete = bytesReadComplete; this.bytesWritten = bytesWritten; @@ -84,6 +102,9 @@ public final class IOMetricsInfo { this.recordsReadComplete = recordsReadComplete; this.recordsWritten = recordsWritten; this.recordsWrittenComplete = recordsWrittenComplete; + this.accumulatedBackpressured = accumulatedBackpressured; + this.accumulatedIdle = accumulatedIdle; + this.accumulatedBusy = accumulatedBusy; } public long getBytesRead() { @@ -118,6 +139,18 @@ public final class IOMetricsInfo { return recordsWrittenComplete; } + public long getAccumulatedBackpressured() { + return accumulatedBackpressured; + } + + public double getAccumulatedBusy() { + return accumulatedBusy; + } + + public long getAccumulatedIdle() { + return accumulatedIdle; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -134,7 +167,10 @@ public final class IOMetricsInfo { && recordsRead == that.recordsRead && recordsReadComplete == that.recordsReadComplete && recordsWritten == that.recordsWritten - && recordsWrittenComplete == that.recordsWrittenComplete; + && recordsWrittenComplete == that.recordsWrittenComplete + && accumulatedBackpressured == that.accumulatedBackpressured + && accumulatedBusy == that.accumulatedBusy + && accumulatedIdle == that.accumulatedIdle; } @Override @@ -147,6 +183,9 @@ public final class IOMetricsInfo { recordsRead, recordsReadComplete, recordsWritten, - recordsWrittenComplete); + recordsWrittenComplete, + accumulatedBackpressured, + accumulatedBusy, + accumulatedIdle); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index f5c222e91c5..a2abb0b5901 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -344,7 +344,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { // verify behavior for canceled executions Execution execution1 = executions.values().iterator().next(); - IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0); + IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0); Map<String, Accumulator<?, ?>> accumulators = new HashMap<>(); accumulators.put("acc", new IntCounter(4)); AccumulatorSnapshot accumulatorSnapshot = @@ -367,7 +367,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { // verify behavior for failed executions Execution execution2 = executions.values().iterator().next(); - IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0); + IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0, 0); Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>(); accumulators2.put("acc", new IntCounter(8)); AccumulatorSnapshot accumulatorSnapshot2 = @@ -405,7 +405,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { Map<ExecutionAttemptID, Execution> executions = scheduler.getExecutionGraph().getRegisteredExecutions(); - IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0); + IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0); Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap(); Execution execution1 = executions.values().iterator().next(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index 62f907f78cc..df765c69cd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -159,7 +159,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger { execution -> { execution.cancel(); execution.completeCancelling( - Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false); + Collections.emptyMap(), new IOMetrics(0, 0, 0, 0, 0, 0, 0), false); }, PartitionReleaseResult.STOP_TRACKING); } @@ -182,7 +182,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger { new Exception("Test exception"), false, Collections.emptyMap(), - new IOMetrics(0, 0, 0, 0), + new IOMetrics(0, 0, 0, 0, 0, 0, 0), false, true), PartitionReleaseResult.STOP_TRACKING); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java index f066e776524..0ddbde97637 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** Tests for {@link TimerGauge}. */ @@ -49,6 +50,7 @@ public class TimerGaugeTest { gauge.update(); assertThat(gauge.getValue(), is(0L)); assertThat(gauge.getMaxSingleMeasurement(), is(0L)); + assertEquals(gauge.getAccumulatedCount(), 0L); gauge.markStart(); clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS); @@ -57,6 +59,7 @@ public class TimerGaugeTest { assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS)); assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP)); + assertEquals(gauge.getAccumulatedCount(), SLEEP); // Check that the getMaxSingleMeasurement can go down after an update gauge.markStart(); @@ -65,6 +68,7 @@ public class TimerGaugeTest { gauge.update(); assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP / 2)); + assertEquals(gauge.getAccumulatedCount(), SLEEP + SLEEP / 2); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index 7acc04bc604..71a978636bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -38,6 +38,8 @@ public class TaskIOMetricGroupTest { public void testTaskIOMetricGroup() throws InterruptedException { TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + taskIO.setEnableBusyTime(true); + final long startTime = System.currentTimeMillis(); // test counter forwarding assertNotNull(taskIO.getNumRecordsInCounter()); @@ -75,6 +77,19 @@ public class TaskIOMetricGroupTest { assertEquals(100L, io.getNumBytesIn()); assertEquals(250L, io.getNumBytesOut()); assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount()); + assertEquals( + taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), io.getAccumulateIdleTime()); + assertEquals( + taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount() + + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(), + io.getAccumulateBackPressuredTime()); + assertThat( + io.getAccumulateBusyTime(), + greaterThanOrEqualTo( + (double) System.currentTimeMillis() + - startTime + - io.getAccumulateIdleTime() + - io.getAccumulateBackPressuredTime())); assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(softSleepTime)); assertThat( taskIO.getSoftBackPressuredTimePerSecond().getCount(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index 5ac759f7396..483ad2facf1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -72,8 +72,19 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { final long bytesOut = 10L; final long recordsIn = 20L; final long recordsOut = 30L; - - final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut); + final long accumulateIdleTime = 40L; + final long accumulateBusyTime = 50L; + final long accumulateBackPressuredTime = 60L; + + final IOMetrics ioMetrics = + new IOMetrics( + bytesIn, + bytesOut, + recordsIn, + recordsOut, + accumulateIdleTime, + accumulateBusyTime, + accumulateBackPressuredTime); final long[] timestamps = new long[ExecutionState.values().length]; timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs; @@ -146,7 +157,18 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { // Verify final IOMetricsInfo ioMetricsInfo = - new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true); + new IOMetricsInfo( + bytesIn, + true, + bytesOut, + true, + recordsIn, + true, + recordsOut, + true, + accumulateBackPressuredTime, + accumulateIdleTime, + accumulateBusyTime); final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 69a8e2b051c..4a288e738be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -77,8 +77,19 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { final long bytesOut = 10L; final long recordsIn = 20L; final long recordsOut = 30L; - - final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut); + final long accumulateIdleTime = 40L; + final long accumulateBusyTime = 50L; + final long accumulateBackPressuredTime = 60L; + + final IOMetrics ioMetrics = + new IOMetrics( + bytesIn, + bytesOut, + recordsIn, + recordsOut, + accumulateIdleTime, + accumulateBusyTime, + accumulateBackPressuredTime); final ArchivedExecutionJobVertex archivedExecutionJobVertex = new ArchivedExecutionJobVertex( @@ -151,7 +162,18 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { // Verify final IOMetricsInfo ioMetricsInfo = - new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true); + new IOMetricsInfo( + bytesIn, + true, + bytesOut, + true, + recordsIn, + true, + recordsOut, + true, + accumulateBackPressuredTime, + accumulateIdleTime, + accumulateBusyTime); final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java index 707aa777542..806ce560494 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java @@ -47,7 +47,10 @@ public class JobVertexDetailsInfoTest random.nextLong(), random.nextBoolean(), random.nextLong(), - random.nextBoolean()); + random.nextBoolean(), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + Math.abs(random.nextDouble())); List<SubtaskExecutionAttemptDetailsInfo> vertexTaskDetailList = new ArrayList<>(); vertexTaskDetailList.add( new SubtaskExecutionAttemptDetailsInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java index cf7aad02879..d988ee017a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java @@ -54,7 +54,10 @@ public class JobVertexTaskManagersInfoTest random.nextLong(), random.nextBoolean(), random.nextLong(), - random.nextBoolean()); + random.nextBoolean(), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + Math.abs(random.nextDouble())); int count = 100; for (ExecutionState executionState : ExecutionState.values()) { statusCounts.put(executionState, count++); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index e867d2720d8..8c207c5b069 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -91,7 +91,10 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai random.nextLong(), random.nextBoolean(), random.nextLong(), - random.nextBoolean()); + random.nextBoolean(), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + Math.abs(random.nextDouble())); for (ExecutionState executionState : ExecutionState.values()) { tasksPerState.put(executionState, random.nextInt()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java index e7ea728507d..f022d6ced26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java @@ -46,7 +46,10 @@ public class SubtaskExecutionAttemptDetailsInfoTest Math.abs(random.nextLong()), random.nextBoolean(), Math.abs(random.nextLong()), - random.nextBoolean()); + random.nextBoolean(), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + Math.abs(random.nextDouble())); return new SubtaskExecutionAttemptDetailsInfo( Math.abs(random.nextInt()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index bf330d9edf3..e8636ea8435 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -133,7 +133,7 @@ public class AdaptiveBatchSchedulerTest extends TestLogger { state, null, null, - new IOMetrics(0, 0, 0, 0))); + new IOMetrics(0, 0, 0, 0, 0, 0, 0))); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 55a2f527a10..19bc707a363 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -772,6 +772,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> scheduleBufferDebloater(); // let the task do its work + getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart(); runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled,