This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0309f13e8af [FLINK-33681][Runtime/Metrics] Reuse input/output metrics of SourceOperator/SinkWriterOperator for task (#23998) 0309f13e8af is described below commit 0309f13e8af62f9b523e227a3a66ff59e838a1b4 Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Thu Aug 22 06:31:44 2024 +0800 [FLINK-33681][Runtime/Metrics] Reuse input/output metrics of SourceOperator/SinkWriterOperator for task (#23998) --- .../base/source/reader/SourceMetricsITCase.java | 21 +++++++++++++++++++++ .../groups/InternalOperatorIOMetricGroup.java | 12 ++++++++++++ .../runtime/metrics/groups/TaskIOMetricGroup.java | 17 +++++++++++++---- .../flink/runtime/testutils/InMemoryReporter.java | 22 +++++++++++++++++++++- .../streaming/api/operators/SourceOperator.java | 5 +++++ .../runtime/operators/sink/SinkWriterOperator.java | 15 +++++++++++++++ .../test/streaming/runtime/SinkMetricsITCase.java | 21 +++++++++++++++++++++ .../streaming/runtime/SinkV2MetricsITCase.java | 20 ++++++++++++++++++++ 8 files changed, 128 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java index b7631ef0311..3c315f694b9 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -223,6 +224,26 @@ public class SourceMetricsITCase extends TestLogger { assertThatGauge(metrics.get(MetricNames.SOURCE_IDLE_TIME)).isEqualTo(0L); } assertThat(subtaskWithMetrics).isEqualTo(numSplits); + + // Test operator I/O metrics are reused by task metrics + List<TaskMetricGroup> taskMetricGroups = + reporter.findTaskMetricGroups(jobId, "MetricTestingSource"); + assertThat(taskMetricGroups).hasSize(parallelism); + + int subtaskWithTaskMetrics = 0; + for (TaskMetricGroup taskMetricGroup : taskMetricGroups) { + // there are only 2 splits assigned; so two groups will not update metrics + if (taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0) { + continue; + } + + subtaskWithTaskMetrics++; + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter()) + .isEqualTo(processedRecordsPerSubtask); + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesInCounter()) + .isEqualTo(processedRecordsPerSubtask * MockRecordEmitter.RECORD_SIZE_IN_BYTES); + } + assertThat(subtaskWithTaskMetrics).isEqualTo(numSplits); } private static class LaggingTimestampAssigner diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java index 31cf560ce78..0405b2d6e07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java @@ -97,4 +97,16 @@ public class InternalOperatorIOMetricGroup extends ProxyMetricGroup<InternalOper TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup(); taskIO.reuseRecordsOutputCounter(this.numRecordsOut); } + + /** Causes the containing task to use this operators input bytes counter. */ + public void reuseBytesInputMetricsForTask() { + TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup(); + taskIO.reuseBytesInputCounter(this.numBytesIn); + } + + /** Causes the containing task to use this operators output bytes counter. */ + public void reuseBytesOutputMetricsForTask() { + TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup(); + taskIO.reuseBytesOutputCounter(this.numBytesOut); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 12ba23cea21..5a034f6b4e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -50,8 +50,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Clock clock; - private final Counter numBytesIn; - private final Counter numBytesOut; + private final SumCounter numBytesIn; + private final SumCounter numBytesOut; private final SumCounter numRecordsIn; private final SumCounter numRecordsOut; private final Counter numBuffersOut; @@ -95,8 +95,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { public TaskIOMetricGroup(TaskMetricGroup parent, Clock clock) { super(parent); this.clock = clock; - this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN); - this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT); + this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new SumCounter()); + this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new SumCounter()); this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn)); this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut)); @@ -325,6 +325,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { // ============================================================================================ // Metric Reuse // ============================================================================================ + + public void reuseBytesInputCounter(Counter numBytesInCounter) { + this.numBytesIn.addCounter(numBytesInCounter); + } + + public void reuseBytesOutputCounter(Counter numBytesOutCounter) { + this.numBytesOut.addCounter(numBytesOutCounter); + } + public void reuseRecordsInputCounter(Counter numRecordsInCounter) { this.numRecordsIn.addCounter(numRecordsInCounter); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java index 4abb4558d4b..1564bb44f02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java @@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; @@ -167,6 +168,21 @@ public class InMemoryReporter implements MetricReporter { } } + public List<TaskMetricGroup> findTaskMetricGroups(JobID jobId, String operatorPattern) { + Pattern pattern = Pattern.compile(operatorPattern); + synchronized (this) { + return metrics.keySet().stream() + .filter( + g -> + g instanceof TaskMetricGroup + && pattern.matcher(getTaskName(g)).find() + && getJobId(g).equals(jobId.toString())) + .map(TaskMetricGroup.class::cast) + .sorted(Comparator.comparing(this::getSubtaskId)) + .collect(Collectors.toList()); + } + } + public List<Tuple3<MetricGroup, String, Metric>> findJobMetricGroups( JobID jobId, String metricPattern) { Pattern pattern = Pattern.compile(metricPattern); @@ -189,7 +205,7 @@ public class InMemoryReporter implements MetricReporter { } } - private String getSubtaskId(OperatorMetricGroup g) { + private String getSubtaskId(MetricGroup g) { return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX); } @@ -197,6 +213,10 @@ public class InMemoryReporter implements MetricReporter { return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME); } + private String getTaskName(MetricGroup g) { + return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_NAME); + } + private String getJobId(MetricGroup g) { return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index b915ab5ba88..3c7c1964a37 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -238,6 +238,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); initSourceMetricGroup(); + // Metric "numRecordsIn" & "numBytesIn" is defined as the total number of records/bytes + // read from the external system in FLIP-33, reuse them for task to account for traffic + // with external system + this.metrics.getIOMetricGroup().reuseInputMetricsForTask(); + this.metrics.getIOMetricGroup().reuseBytesInputMetricsForTask(); } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index f39b9f9cdf1..ea9a60d6133 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -44,11 +44,13 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.UserCodeClassLoader; import javax.annotation.Nullable; @@ -129,6 +131,19 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab } } + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<CommittableMessage<CommT>>> output) { + super.setup(containingTask, config, output); + // Metric "numRecordsOut" & "numBytesOut" is defined as the total number of records/bytes + // written to the external system in FLIP-33, reuse them for task to account for traffic + // with external system + this.metrics.getIOMetricGroup().reuseOutputMetricsForTask(); + this.metrics.getIOMetricGroup().reuseBytesOutputMetricsForTask(); + } + @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java index fff561013e9..54d5734e2e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -159,6 +160,26 @@ public class SinkMetricsITCase extends TestLogger { .isEqualTo((processedRecordsPerSubtask - 1) * MetricWriter.BASE_SEND_TIME); } assertThat(subtaskWithMetrics, equalTo(numSplits)); + + // Test operator I/O metrics are reused by task metrics + List<TaskMetricGroup> taskMetricGroups = + reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME); + assertThat(taskMetricGroups, hasSize(parallelism)); + + int subtaskWithTaskMetrics = 0; + for (TaskMetricGroup taskMetricGroup : taskMetricGroups) { + // there are only 2 splits assigned; so two groups will not update metrics + if (taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) { + continue; + } + + subtaskWithTaskMetrics++; + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()) + .isEqualTo(processedRecordsPerSubtask); + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter()) + .isEqualTo(processedRecordsPerSubtask * MetricWriter.RECORD_SIZE_IN_BYTES); + } + assertThat(subtaskWithTaskMetrics, equalTo(numSplits)); } private static class MetricWriter extends TestSink.DefaultSinkWriter<Long> { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java index 4fd5a2e5770..bb227552a3e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -213,6 +214,25 @@ public class SinkV2MetricsITCase extends TestLogger { .isEqualTo((processedRecordsPerSubtask - 1) * MetricWriter.BASE_SEND_TIME); } assertThat(subtaskWithMetrics, equalTo(numSplits)); + + // Test operator I/O metrics are reused by task metrics + List<TaskMetricGroup> taskMetricGroups = + reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME); + + int subtaskWithTaskMetrics = 0; + for (TaskMetricGroup taskMetricGroup : taskMetricGroups) { + // there are only 2 splits assigned; so two groups will not update metrics + if (taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) { + continue; + } + + subtaskWithTaskMetrics++; + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()) + .isEqualTo(processedRecordsPerSubtask); + assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter()) + .isEqualTo(processedRecordsPerSubtask * MetricWriter.RECORD_SIZE_IN_BYTES); + } + assertThat(subtaskWithTaskMetrics, equalTo(numSplits)); } private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long> expected) {