This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLIP-410-update3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ce1d11933f95b3e66d1316e75d6697aa19ac8776 Author: Weijie Guo <res...@163.com> AuthorDate: Tue Apr 9 18:09:03 2024 +0800 [FLINK-34549][API] Expose MetricGroup via RuntimeContext --- flink-core-api/pom.xml | 7 +++++++ flink-core/pom.xml | 6 ------ .../org/apache/flink/datastream/api/context/RuntimeContext.java | 4 ++++ .../datastream/impl/context/DefaultNonPartitionedContext.java | 6 ++++++ .../flink/datastream/impl/context/DefaultPartitionedContext.java | 6 ++++++ .../flink/datastream/impl/context/DefaultRuntimeContext.java | 9 +++++++++ .../impl/context/DefaultTwoOutputNonPartitionedContext.java | 6 ++++++ 7 files changed, 38 insertions(+), 6 deletions(-) diff --git a/flink-core-api/pom.xml b/flink-core-api/pom.xml index a66ca8b386a..d9301b74921 100644 --- a/flink-core-api/pom.xml +++ b/flink-core-api/pom.xml @@ -40,6 +40,13 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- ================== test dependencies ================== --> <dependency> diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 7c267f76fb0..4521e7464da 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -71,12 +71,6 @@ under the License. <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-core</artifactId> - <version>${project.version}</version> - </dependency> - <!-- ArchUnit test dependencies --> <dependency> diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java index c82929113f0..2b98bc76e27 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.api.context; import org.apache.flink.annotation.Experimental; +import org.apache.flink.metrics.MetricGroup; /** * A RuntimeContext contains information about the context in which process functions are executed. @@ -33,4 +34,7 @@ public interface RuntimeContext { /** Get the {@link TaskInfo} of this process function. */ TaskInfo getTaskInfo(); + + /** Get the metric group of this process function. */ + MetricGroup getMetricGroup(); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java index d2fc17a9463..34e6df8b38f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java @@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.function.ApplyPartitionFunction; +import org.apache.flink.metrics.MetricGroup; /** The default implementation of {@link NonPartitionedContext}. */ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> { @@ -45,4 +46,9 @@ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext< public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java index 2d44a003c97..61fc9501213 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java @@ -23,6 +23,7 @@ import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; +import org.apache.flink.metrics.MetricGroup; import java.util.function.Consumer; import java.util.function.Supplier; @@ -64,4 +65,9 @@ public class DefaultPartitionedContext implements PartitionedContext { public ProcessingTimeManager getProcessingTimeManager() { return processingTimeManager; } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java index d3ede8bdb48..80eacbce4d8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java @@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; /** The default implementation of {@link RuntimeContext}. */ @@ -29,6 +30,8 @@ public class DefaultRuntimeContext implements RuntimeContext { private final DefaultTaskInfo taskInfo; + private final MetricGroup metricGroup; + public DefaultRuntimeContext( StreamingRuntimeContext operatorContext, int parallelism, @@ -36,6 +39,7 @@ public class DefaultRuntimeContext implements RuntimeContext { String taskName) { this.jobInfo = new DefaultJobInfo(operatorContext); this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName); + this.metricGroup = operatorContext.getMetricGroup(); } @Override @@ -47,4 +51,9 @@ public class DefaultRuntimeContext implements RuntimeContext { public TaskInfo getTaskInfo() { return taskInfo; } + + @Override + public MetricGroup getMetricGroup() { + return metricGroup; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java index 3ae9d4cab2d..9b604379bbd 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java @@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction; +import org.apache.flink.metrics.MetricGroup; /** The default implementation of {@link TwoOutputNonPartitionedContext}. */ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> @@ -47,4 +48,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } }