This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new ae487b1ac16 [FLINK-31557][Metrics] Add separate executor for view 
updater
ae487b1ac16 is described below

commit ae487b1ac16f710194a6b90bd61475a84919f0d1
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Thu Mar 30 21:56:40 2023 +0800

    [FLINK-31557][Metrics] Add separate executor for view updater
---
 .../content.zh/docs/deployment/metric_reporters.md |  1 +
 docs/content/docs/deployment/metric_reporters.md   |  3 +-
 .../apache/flink/metrics/reporter/Scheduled.java   |  3 +-
 .../flink/runtime/metrics/MetricRegistryImpl.java  | 37 +++++++++++++++++-----
 4 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/docs/content.zh/docs/deployment/metric_reporters.md 
b/docs/content.zh/docs/deployment/metric_reporters.md
index 2c4cdb9b033..cbbbcf2d14a 100644
--- a/docs/content.zh/docs/deployment/metric_reporters.md
+++ b/docs/content.zh/docs/deployment/metric_reporters.md
@@ -59,6 +59,7 @@ metrics.reporter.my_other_reporter.port: 10000
 **注意**:Flink 在启动时必须能访问到发送器所属的 jar 包,发送器会被加载为 [plugins]({{< ref 
"docs/deployment/filesystems/plugins" >}}),Flink 
自带的发送器(文档中已经列出的发送器)无需做其他配置,开箱即用。
 
 你可以实现 `org.apache.flink.metrics.reporter.MetricReporter` 接口来自定义发送器,并实现 
`Scheduled` 接口让发送器周期性地将运行时指标发送出去。
+需要注意 `report()` 方法不应该阻塞太长的时间,所有用时很长的操作应该异步执行。
 另外也可以实现 `MetricReporterFactory` 接口,让发送器作为插件被 Flink 导入。
 
 <a name="identifiers-vs-tags"></a>
diff --git a/docs/content/docs/deployment/metric_reporters.md 
b/docs/content/docs/deployment/metric_reporters.md
index e8b3ccb2e20..7409630fb90 100644
--- a/docs/content/docs/deployment/metric_reporters.md
+++ b/docs/content/docs/deployment/metric_reporters.md
@@ -59,7 +59,8 @@ metrics.reporter.my_other_reporter.port: 10000
  All reporters documented on this page are available by default.
 
 You can write your own `Reporter` by implementing the 
`org.apache.flink.metrics.reporter.MetricReporter` interface.
-If the Reporter should send out reports regularly you have to implement the 
`Scheduled` interface as well.
+If the Reporter should send out reports regularly you have to implement the 
`Scheduled` interface as well. 
+Be careful that `report()` method must not block for a significant amount of 
time, and any reporter needing more time should instead run the operation 
asynchronously.
 By additionally implementing a `MetricReporterFactory` your reporter can also 
be loaded as a plugin.
 
 ## Identifiers vs. tags
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
index a4e5edbffad..2dcf6fcd673 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
@@ -26,7 +26,8 @@ public interface Scheduled {
 
     /**
      * Report the current measurements. This method is called periodically by 
the metrics registry
-     * that uses the reporter.
+     * that uses the reporter. This method must not block for a significant 
amount of time, any
+     * reporter needing more time should instead run the operation 
asynchronously.
      */
     void report();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index ac0e4df3cf9..d05eec313f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -74,7 +74,8 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
     private final Object lock = new Object();
 
     private final List<ReporterAndSettings> reporters;
-    private final ScheduledExecutorService executor;
+    private final ScheduledExecutorService reporterScheduledExecutor;
+    private final ScheduledExecutorService viewUpdaterScheduledExecutor;
 
     private final ScopeFormats scopeFormats;
     private final char globalDelimiter;
@@ -102,7 +103,9 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
                 config,
                 reporterConfigurations,
                 Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory("Flink-MetricRegistry")));
+                        new ExecutorThreadFactory("Flink-Metric-Reporter")),
+                Executors.newSingleThreadScheduledExecutor(
+                        new 
ExecutorThreadFactory("Flink-Metric-View-Updater")));
     }
 
     @VisibleForTesting
@@ -110,6 +113,14 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
             MetricRegistryConfiguration config,
             Collection<ReporterSetup> reporterConfigurations,
             ScheduledExecutorService scheduledExecutor) {
+        this(config, reporterConfigurations, scheduledExecutor, 
scheduledExecutor);
+    }
+
+    MetricRegistryImpl(
+            MetricRegistryConfiguration config,
+            Collection<ReporterSetup> reporterConfigurations,
+            ScheduledExecutorService reporterScheduledExecutor,
+            ScheduledExecutorService viewUpdaterScheduledExecutor) {
         this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
         this.scopeFormats = config.getScopeFormats();
         this.globalDelimiter = config.getDelimiter();
@@ -119,7 +130,8 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
         // second, instantiate any custom configured reporters
         this.reporters = new ArrayList<>(4);
 
-        this.executor = scheduledExecutor;
+        this.reporterScheduledExecutor = reporterScheduledExecutor;
+        this.viewUpdaterScheduledExecutor = viewUpdaterScheduledExecutor;
 
         this.queryService = null;
         this.metricQueryServiceRpcService = null;
@@ -145,7 +157,7 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
                                 namedReporter,
                                 className);
 
-                        executor.scheduleWithFixedDelay(
+                        reporterScheduledExecutor.scheduleWithFixedDelay(
                                 new 
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance),
                                 period.toMillis(),
                                 period.toMillis(),
@@ -347,11 +359,20 @@ public class MetricRegistryImpl implements 
MetricRegistry, AutoCloseableAsync {
                                             throwable)));
                 }
 
-                final CompletableFuture<Void> executorShutdownFuture =
+                final CompletableFuture<Void> reporterExecutorShutdownFuture =
+                        ExecutorUtils.nonBlockingShutdown(
+                                gracePeriod.toMilliseconds(),
+                                TimeUnit.MILLISECONDS,
+                                reporterScheduledExecutor);
+                terminationFutures.add(reporterExecutorShutdownFuture);
+
+                final CompletableFuture<Void> 
viewUpdaterExecutorShutdownFuture =
                         ExecutorUtils.nonBlockingShutdown(
-                                gracePeriod.toMilliseconds(), 
TimeUnit.MILLISECONDS, executor);
+                                gracePeriod.toMilliseconds(),
+                                TimeUnit.MILLISECONDS,
+                                viewUpdaterScheduledExecutor);
 
-                terminationFutures.add(executorShutdownFuture);
+                terminationFutures.add(viewUpdaterExecutorShutdownFuture);
 
                 FutureUtils.completeAll(terminationFutures)
                         .whenComplete(
@@ -403,7 +424,7 @@ public class MetricRegistryImpl implements MetricRegistry, 
AutoCloseableAsync {
                 try {
                     if (metric instanceof View) {
                         if (viewUpdater == null) {
-                            viewUpdater = new ViewUpdater(executor);
+                            viewUpdater = new 
ViewUpdater(viewUpdaterScheduledExecutor);
                         }
                         viewUpdater.notifyOfAddedView((View) metric);
                     }

Reply via email to