This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLIP-410-update3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce1d11933f95b3e66d1316e75d6697aa19ac8776
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 9 18:09:03 2024 +0800

    [FLINK-34549][API] Expose MetricGroup via RuntimeContext
---
 flink-core-api/pom.xml                                           | 7 +++++++
 flink-core/pom.xml                                               | 6 ------
 .../org/apache/flink/datastream/api/context/RuntimeContext.java  | 4 ++++
 .../datastream/impl/context/DefaultNonPartitionedContext.java    | 6 ++++++
 .../flink/datastream/impl/context/DefaultPartitionedContext.java | 6 ++++++
 .../flink/datastream/impl/context/DefaultRuntimeContext.java     | 9 +++++++++
 .../impl/context/DefaultTwoOutputNonPartitionedContext.java      | 6 ++++++
 7 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/flink-core-api/pom.xml b/flink-core-api/pom.xml
index a66ca8b386a..d9301b74921 100644
--- a/flink-core-api/pom.xml
+++ b/flink-core-api/pom.xml
@@ -40,6 +40,13 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+
                <!-- ================== test dependencies ================== -->
 
                <dependency>
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 7c267f76fb0..4521e7464da 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -71,12 +71,6 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
                <!-- ArchUnit test dependencies -->
 
                <dependency>
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
index c82929113f0..2b98bc76e27 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.datastream.api.context;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.MetricGroup;
 
 /**
  * A RuntimeContext contains information about the context in which process 
functions are executed.
@@ -33,4 +34,7 @@ public interface RuntimeContext {
 
     /** Get the {@link TaskInfo} of this process function. */
     TaskInfo getTaskInfo();
+
+    /** Get the metric group of this process function. */
+    MetricGroup getMetricGroup();
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
index d2fc17a9463..34e6df8b38f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.metrics.MetricGroup;
 
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<OUT> {
@@ -45,4 +46,9 @@ public class DefaultNonPartitionedContext<OUT> implements 
NonPartitionedContext<
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index 2d44a003c97..61fc9501213 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.datastream.api.context.PartitionedContext;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
+import org.apache.flink.metrics.MetricGroup;
 
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -64,4 +65,9 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
     public ProcessingTimeManager getProcessingTimeManager() {
         return processingTimeManager;
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
index d3ede8bdb48..80eacbce4d8 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 /** The default implementation of {@link RuntimeContext}. */
@@ -29,6 +30,8 @@ public class DefaultRuntimeContext implements RuntimeContext {
 
     private final DefaultTaskInfo taskInfo;
 
+    private final MetricGroup metricGroup;
+
     public DefaultRuntimeContext(
             StreamingRuntimeContext operatorContext,
             int parallelism,
@@ -36,6 +39,7 @@ public class DefaultRuntimeContext implements RuntimeContext {
             String taskName) {
         this.jobInfo = new DefaultJobInfo(operatorContext);
         this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, 
taskName);
+        this.metricGroup = operatorContext.getMetricGroup();
     }
 
     @Override
@@ -47,4 +51,9 @@ public class DefaultRuntimeContext implements RuntimeContext {
     public TaskInfo getTaskInfo() {
         return taskInfo;
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
index 3ae9d4cab2d..9b604379bbd 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import 
org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
+import org.apache.flink.metrics.MetricGroup;
 
 /** The default implementation of {@link TwoOutputNonPartitionedContext}. */
 public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
@@ -47,4 +48,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return context.getMetricGroup();
+    }
 }

Reply via email to