This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0309f13e8af [FLINK-33681][Runtime/Metrics] Reuse input/output metrics 
of SourceOperator/SinkWriterOperator for task (#23998)
0309f13e8af is described below

commit 0309f13e8af62f9b523e227a3a66ff59e838a1b4
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Thu Aug 22 06:31:44 2024 +0800

    [FLINK-33681][Runtime/Metrics] Reuse input/output metrics of 
SourceOperator/SinkWriterOperator for task (#23998)
---
 .../base/source/reader/SourceMetricsITCase.java    | 21 +++++++++++++++++++++
 .../groups/InternalOperatorIOMetricGroup.java      | 12 ++++++++++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 17 +++++++++++++----
 .../flink/runtime/testutils/InMemoryReporter.java  | 22 +++++++++++++++++++++-
 .../streaming/api/operators/SourceOperator.java    |  5 +++++
 .../runtime/operators/sink/SinkWriterOperator.java | 15 +++++++++++++++
 .../test/streaming/runtime/SinkMetricsITCase.java  | 21 +++++++++++++++++++++
 .../streaming/runtime/SinkV2MetricsITCase.java     | 20 ++++++++++++++++++++
 8 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index b7631ef0311..3c315f694b9 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -223,6 +224,26 @@ public class SourceMetricsITCase extends TestLogger {
             
assertThatGauge(metrics.get(MetricNames.SOURCE_IDLE_TIME)).isEqualTo(0L);
         }
         assertThat(subtaskWithMetrics).isEqualTo(numSplits);
+
+        // Test operator I/O metrics are reused by task metrics
+        List<TaskMetricGroup> taskMetricGroups =
+                reporter.findTaskMetricGroups(jobId, "MetricTestingSource");
+        assertThat(taskMetricGroups).hasSize(parallelism);
+
+        int subtaskWithTaskMetrics = 0;
+        for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+            // there are only 2 splits assigned; so two groups will not update 
metrics
+            if 
(taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0) {
+                continue;
+            }
+
+            subtaskWithTaskMetrics++;
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter())
+                    .isEqualTo(processedRecordsPerSubtask);
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesInCounter())
+                    .isEqualTo(processedRecordsPerSubtask * 
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
+        }
+        assertThat(subtaskWithTaskMetrics).isEqualTo(numSplits);
     }
 
     private static class LaggingTimestampAssigner
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
index 31cf560ce78..0405b2d6e07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
@@ -97,4 +97,16 @@ public class InternalOperatorIOMetricGroup extends 
ProxyMetricGroup<InternalOper
         TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
         taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
     }
+
+    /** Causes the containing task to use this operators input bytes counter. 
*/
+    public void reuseBytesInputMetricsForTask() {
+        TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
+        taskIO.reuseBytesInputCounter(this.numBytesIn);
+    }
+
+    /** Causes the containing task to use this operators output bytes counter. 
*/
+    public void reuseBytesOutputMetricsForTask() {
+        TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
+        taskIO.reuseBytesOutputCounter(this.numBytesOut);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 12ba23cea21..5a034f6b4e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -50,8 +50,8 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
 
     private final Clock clock;
 
-    private final Counter numBytesIn;
-    private final Counter numBytesOut;
+    private final SumCounter numBytesIn;
+    private final SumCounter numBytesOut;
     private final SumCounter numRecordsIn;
     private final SumCounter numRecordsOut;
     private final Counter numBuffersOut;
@@ -95,8 +95,8 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     public TaskIOMetricGroup(TaskMetricGroup parent, Clock clock) {
         super(parent);
         this.clock = clock;
-        this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN);
-        this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT);
+        this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new 
SumCounter());
+        this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new 
SumCounter());
         this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new 
MeterView(numBytesIn));
         this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new 
MeterView(numBytesOut));
 
@@ -325,6 +325,15 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     // 
============================================================================================
     // Metric Reuse
     // 
============================================================================================
+
+    public void reuseBytesInputCounter(Counter numBytesInCounter) {
+        this.numBytesIn.addCounter(numBytesInCounter);
+    }
+
+    public void reuseBytesOutputCounter(Counter numBytesOutCounter) {
+        this.numBytesOut.addCounter(numBytesOutCounter);
+    }
+
     public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
         this.numRecordsIn.addCounter(numRecordsInCounter);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
index 4abb4558d4b..1564bb44f02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import org.slf4j.Logger;
@@ -167,6 +168,21 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
+    public List<TaskMetricGroup> findTaskMetricGroups(JobID jobId, String 
operatorPattern) {
+        Pattern pattern = Pattern.compile(operatorPattern);
+        synchronized (this) {
+            return metrics.keySet().stream()
+                    .filter(
+                            g ->
+                                    g instanceof TaskMetricGroup
+                                            && 
pattern.matcher(getTaskName(g)).find()
+                                            && 
getJobId(g).equals(jobId.toString()))
+                    .map(TaskMetricGroup.class::cast)
+                    .sorted(Comparator.comparing(this::getSubtaskId))
+                    .collect(Collectors.toList());
+        }
+    }
+
     public List<Tuple3<MetricGroup, String, Metric>> findJobMetricGroups(
             JobID jobId, String metricPattern) {
         Pattern pattern = Pattern.compile(metricPattern);
@@ -189,7 +205,7 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
-    private String getSubtaskId(OperatorMetricGroup g) {
+    private String getSubtaskId(MetricGroup g) {
         return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX);
     }
 
@@ -197,6 +213,10 @@ public class InMemoryReporter implements MetricReporter {
         return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME);
     }
 
+    private String getTaskName(MetricGroup g) {
+        return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_NAME);
+    }
+
     private String getJobId(MetricGroup g) {
         return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b915ab5ba88..3c7c1964a37 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -238,6 +238,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             Output<StreamRecord<OUT>> output) {
         super.setup(containingTask, config, output);
         initSourceMetricGroup();
+        // Metric "numRecordsIn" & "numBytesIn" is defined as the total number 
of records/bytes
+        // read from the external system in FLIP-33, reuse them for task to 
account for traffic
+        // with external system
+        this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
+        this.metrics.getIOMetricGroup().reuseBytesInputMetricsForTask();
     }
 
     @VisibleForTesting
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index f39b9f9cdf1..ea9a60d6133 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -44,11 +44,13 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.UserCodeClassLoader;
 
 import javax.annotation.Nullable;
@@ -129,6 +131,19 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
     }
 
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<CommittableMessage<CommT>>> output) {
+        super.setup(containingTask, config, output);
+        // Metric "numRecordsOut" & "numBytesOut" is defined as the total 
number of records/bytes
+        // written to the external system in FLIP-33, reuse them for task to 
account for traffic
+        // with external system
+        this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
+        this.metrics.getIOMetricGroup().reuseBytesOutputMetricsForTask();
+    }
+
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index fff561013e9..54d5734e2e6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -159,6 +160,26 @@ public class SinkMetricsITCase extends TestLogger {
                     .isEqualTo((processedRecordsPerSubtask - 1) * 
MetricWriter.BASE_SEND_TIME);
         }
         assertThat(subtaskWithMetrics, equalTo(numSplits));
+
+        // Test operator I/O metrics are reused by task metrics
+        List<TaskMetricGroup> taskMetricGroups =
+                reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+        assertThat(taskMetricGroups, hasSize(parallelism));
+
+        int subtaskWithTaskMetrics = 0;
+        for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+            // there are only 2 splits assigned; so two groups will not update 
metrics
+            if 
(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+                continue;
+            }
+
+            subtaskWithTaskMetrics++;
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask);
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+        }
+        assertThat(subtaskWithTaskMetrics, equalTo(numSplits));
     }
 
     private static class MetricWriter extends TestSink.DefaultSinkWriter<Long> 
{
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
index 4fd5a2e5770..bb227552a3e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -213,6 +214,25 @@ public class SinkV2MetricsITCase extends TestLogger {
                     .isEqualTo((processedRecordsPerSubtask - 1) * 
MetricWriter.BASE_SEND_TIME);
         }
         assertThat(subtaskWithMetrics, equalTo(numSplits));
+
+        // Test operator I/O metrics are reused by task metrics
+        List<TaskMetricGroup> taskMetricGroups =
+                reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+
+        int subtaskWithTaskMetrics = 0;
+        for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+            // there are only 2 splits assigned; so two groups will not update 
metrics
+            if 
(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+                continue;
+            }
+
+            subtaskWithTaskMetrics++;
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask);
+            
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+        }
+        assertThat(subtaskWithTaskMetrics, equalTo(numSplits));
     }
 
     private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long> 
expected) {

Reply via email to