This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff93e2aaa0e958a974839a317db09a3a4bb7197e Author: Weijie Guo <res...@163.com> AuthorDate: Tue Apr 9 18:09:03 2024 +0800 [FLINK-34549][API] Expose MetricGroup via RuntimeContext --- flink-core-api/pom.xml | 7 +++++++ flink-core/pom.xml | 6 ------ .../flink/datastream/api/context/RuntimeContext.java | 4 ++++ .../impl/context/DefaultNonPartitionedContext.java | 6 ++++++ .../impl/context/DefaultPartitionedContext.java | 6 ++++++ .../datastream/impl/context/DefaultRuntimeContext.java | 16 +++++++++++++++- .../context/DefaultTwoOutputNonPartitionedContext.java | 6 ++++++ .../flink/datastream/impl/operators/ProcessOperator.java | 3 ++- .../impl/operators/TwoInputBroadcastProcessOperator.java | 3 ++- .../operators/TwoInputNonBroadcastProcessOperator.java | 3 ++- .../impl/operators/TwoOutputProcessOperator.java | 3 ++- 11 files changed, 52 insertions(+), 11 deletions(-) diff --git a/flink-core-api/pom.xml b/flink-core-api/pom.xml index a66ca8b386a..d9301b74921 100644 --- a/flink-core-api/pom.xml +++ b/flink-core-api/pom.xml @@ -40,6 +40,13 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- ================== test dependencies ================== --> <dependency> diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 7c267f76fb0..4521e7464da 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -71,12 +71,6 @@ under the License. <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-core</artifactId> - <version>${project.version}</version> - </dependency> - <!-- ArchUnit test dependencies --> <dependency> diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java index c82929113f0..2b98bc76e27 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.api.context; import org.apache.flink.annotation.Experimental; +import org.apache.flink.metrics.MetricGroup; /** * A RuntimeContext contains information about the context in which process functions are executed. @@ -33,4 +34,7 @@ public interface RuntimeContext { /** Get the {@link TaskInfo} of this process function. */ TaskInfo getTaskInfo(); + + /** Get the metric group of this process function. */ + MetricGroup getMetricGroup(); } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java index d2fc17a9463..34e6df8b38f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java @@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.function.ApplyPartitionFunction; +import org.apache.flink.metrics.MetricGroup; /** The default implementation of {@link NonPartitionedContext}. */ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> { @@ -45,4 +46,9 @@ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext< public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java index 2d44a003c97..61fc9501213 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java @@ -23,6 +23,7 @@ import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; +import org.apache.flink.metrics.MetricGroup; import java.util.function.Consumer; import java.util.function.Supplier; @@ -64,4 +65,9 @@ public class DefaultPartitionedContext implements PartitionedContext { public ProcessingTimeManager getProcessingTimeManager() { return processingTimeManager; } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java index 2e1820022d7..54d76d7a8d3 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultRuntimeContext.java @@ -21,6 +21,7 @@ package org.apache.flink.datastream.impl.context; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobType; /** The default implementation of {@link RuntimeContext}. */ @@ -29,10 +30,18 @@ public class DefaultRuntimeContext implements RuntimeContext { private final DefaultTaskInfo taskInfo; + private final MetricGroup metricGroup; + public DefaultRuntimeContext( - String jobName, JobType jobType, int parallelism, int maxParallelism, String taskName) { + String jobName, + JobType jobType, + int parallelism, + int maxParallelism, + String taskName, + MetricGroup metricGroup) { this.jobInfo = new DefaultJobInfo(jobName, jobType); this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName); + this.metricGroup = metricGroup; } @Override @@ -44,4 +53,9 @@ public class DefaultRuntimeContext implements RuntimeContext { public TaskInfo getTaskInfo() { return taskInfo; } + + @Override + public MetricGroup getMetricGroup() { + return metricGroup; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java index 3ae9d4cab2d..9b604379bbd 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java @@ -22,6 +22,7 @@ import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction; +import org.apache.flink.metrics.MetricGroup; /** The default implementation of {@link TwoOutputNonPartitionedContext}. */ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> @@ -47,4 +48,9 @@ public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 91f9821fea2..be0fa04dd50 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -64,7 +64,8 @@ public class ProcessOperator<IN, OUT> operatorContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getTaskName()); + taskInfo.getTaskName(), + operatorContext.getMetricGroup()); partitionedContext = new DefaultPartitionedContext( context, this::currentKey, this::setCurrentKey, getProcessingTimeManager()); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index b8cba8fd17d..98a76edd5e0 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -68,7 +68,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT> operatorContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getTaskName()); + taskInfo.getTaskName(), + operatorContext.getMetricGroup()); this.partitionedContext = new DefaultPartitionedContext( context, this::currentKey, this::setCurrentKey, getProcessingTimeManager()); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index 19dd7859f40..982edcaa4dc 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -68,7 +68,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT> operatorContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getTaskName()); + taskInfo.getTaskName(), + operatorContext.getMetricGroup()); this.partitionedContext = new DefaultPartitionedContext( context, this::currentKey, this::setCurrentKey, getProcessingTimeManager()); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index d12fc79c957..4f58db50ad6 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -79,7 +79,8 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> operatorContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getTaskName()); + taskInfo.getTaskName(), + operatorContext.getMetricGroup()); this.partitionedContext = new DefaultPartitionedContext( context, this::currentKey, this::setCurrentKey, getProcessingTimeManager());