This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff93e2aaa0e958a974839a317db09a3a4bb7197e
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 9 18:09:03 2024 +0800

    [FLINK-34549][API] Expose MetricGroup via RuntimeContext
---
 flink-core-api/pom.xml                                   |  7 +++++++
 flink-core/pom.xml                                       |  6 ------
 .../flink/datastream/api/context/RuntimeContext.java     |  4 ++++
 .../impl/context/DefaultNonPartitionedContext.java       |  6 ++++++
 .../impl/context/DefaultPartitionedContext.java          |  6 ++++++
 .../datastream/impl/context/DefaultRuntimeContext.java   | 16 +++++++++++++++-
 .../context/DefaultTwoOutputNonPartitionedContext.java   |  6 ++++++
 .../flink/datastream/impl/operators/ProcessOperator.java |  3 ++-
 .../impl/operators/TwoInputBroadcastProcessOperator.java |  3 ++-
 .../operators/TwoInputNonBroadcastProcessOperator.java   |  3 ++-
 .../impl/operators/TwoOutputProcessOperator.java         |  3 ++-
 11 files changed, 52 insertions(+), 11 deletions(-)

diff --git a/flink-core-api/pom.xml b/flink-core-api/pom.xml
index a66ca8b386a..d9301b74921 100644
--- a/flink-core-api/pom.xml
+++ b/flink-core-api/pom.xml
@@ -40,6 +40,13 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+
                <!-- ================== test dependencies ================== -->
 
                <dependency>
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 7c267f76fb0..4521e7464da 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -71,12 +71,6 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
                <!-- ArchUnit test dependencies -->
 
                <dependency>
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
index c82929113f0..2b98bc76e27 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.datastream.api.context;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.MetricGroup;
 
 /**
  * A RuntimeContext contains information about the context in which process 
functions are executed.
@@ -33,4 +34,7 @@ public interface RuntimeContext {
 
     /** Get the {@link TaskInfo} of this process function. */
     TaskInfo getTaskInfo();
+
+    /** Get the metric group of this process function. */
+    MetricGroup getMetricGroup();
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
index d2fc17a9463..34e6df8b38f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.metrics.MetricGroup;
 
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<OUT> {
@@ -45,4 +46,9 @@ public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index 2d44a003c97..61fc9501213 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.datastream.api.context.PartitionedContext;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
+import org.apache.flink.metrics.MetricGroup;
 
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -64,4 +65,9 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
     public ProcessingTimeManager getProcessingTimeManager() {
         return processingTimeManager;
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
index 2e1820022d7..54d76d7a8d3 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobType;
 
 /** The default implementation of {@link RuntimeContext}. */
@@ -29,10 +30,18 @@ public class DefaultRuntimeContext implements 
RuntimeContext {
 
     private final DefaultTaskInfo taskInfo;
 
+    private final MetricGroup metricGroup;
+
     public DefaultRuntimeContext(
-            String jobName, JobType jobType, int parallelism, int 
maxParallelism, String taskName) {
+            String jobName,
+            JobType jobType,
+            int parallelism,
+            int maxParallelism,
+            String taskName,
+            MetricGroup metricGroup) {
         this.jobInfo = new DefaultJobInfo(jobName, jobType);
         this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, 
taskName);
+        this.metricGroup = metricGroup;
     }
 
     @Override
@@ -44,4 +53,9 @@ public class DefaultRuntimeContext implements RuntimeContext {
     public TaskInfo getTaskInfo() {
         return taskInfo;
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
index 3ae9d4cab2d..9b604379bbd 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import 
org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
+import org.apache.flink.metrics.MetricGroup;
 
 /** The default implementation of {@link TwoOutputNonPartitionedContext}. */
 public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
@@ -47,4 +48,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 91f9821fea2..be0fa04dd50 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -64,7 +64,8 @@ public class ProcessOperator<IN, OUT>
                         operatorContext.getJobType(),
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
-                        taskInfo.getTaskName());
+                        taskInfo.getTaskName(),
+                        operatorContext.getMetricGroup());
         partitionedContext =
                 new DefaultPartitionedContext(
                         context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index b8cba8fd17d..98a76edd5e0 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -68,7 +68,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
                         operatorContext.getJobType(),
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
-                        taskInfo.getTaskName());
+                        taskInfo.getTaskName(),
+                        operatorContext.getMetricGroup());
         this.partitionedContext =
                 new DefaultPartitionedContext(
                         context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 19dd7859f40..982edcaa4dc 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -68,7 +68,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                         operatorContext.getJobType(),
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
-                        taskInfo.getTaskName());
+                        taskInfo.getTaskName(),
+                        operatorContext.getMetricGroup());
         this.partitionedContext =
                 new DefaultPartitionedContext(
                         context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index d12fc79c957..4f58db50ad6 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -79,7 +79,8 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
                         operatorContext.getJobType(),
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
-                        taskInfo.getTaskName());
+                        taskInfo.getTaskName(),
+                        operatorContext.getMetricGroup());
         this.partitionedContext =
                 new DefaultPartitionedContext(
                         context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());

Reply via email to