This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new ae487b1ac16 [FLINK-31557][Metrics] Add separate executor for view updater ae487b1ac16 is described below commit ae487b1ac16f710194a6b90bd61475a84919f0d1 Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Thu Mar 30 21:56:40 2023 +0800 [FLINK-31557][Metrics] Add separate executor for view updater --- .../content.zh/docs/deployment/metric_reporters.md | 1 + docs/content/docs/deployment/metric_reporters.md | 3 +- .../apache/flink/metrics/reporter/Scheduled.java | 3 +- .../flink/runtime/metrics/MetricRegistryImpl.java | 37 +++++++++++++++++----- 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/docs/content.zh/docs/deployment/metric_reporters.md b/docs/content.zh/docs/deployment/metric_reporters.md index 2c4cdb9b033..cbbbcf2d14a 100644 --- a/docs/content.zh/docs/deployment/metric_reporters.md +++ b/docs/content.zh/docs/deployment/metric_reporters.md @@ -59,6 +59,7 @@ metrics.reporter.my_other_reporter.port: 10000 **注意**:Flink 在启动时必须能访问到发送器所属的 jar 包,发送器会被加载为 [plugins]({{< ref "docs/deployment/filesystems/plugins" >}}),Flink 自带的发送器(文档中已经列出的发送器)无需做其他配置,开箱即用。 你可以实现 `org.apache.flink.metrics.reporter.MetricReporter` 接口来自定义发送器,并实现 `Scheduled` 接口让发送器周期性地将运行时指标发送出去。 +需要注意 `report()` 方法不应该阻塞太长的时间,所有用时很长的操作应该异步执行。 另外也可以实现 `MetricReporterFactory` 接口,让发送器作为插件被 Flink 导入。 <a name="identifiers-vs-tags"></a> diff --git a/docs/content/docs/deployment/metric_reporters.md b/docs/content/docs/deployment/metric_reporters.md index e8b3ccb2e20..7409630fb90 100644 --- a/docs/content/docs/deployment/metric_reporters.md +++ b/docs/content/docs/deployment/metric_reporters.md @@ -59,7 +59,8 @@ metrics.reporter.my_other_reporter.port: 10000 All reporters documented on this page are available by default. You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface. -If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. +If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. +Be careful that `report()` method must not block for a significant amount of time, and any reporter needing more time should instead run the operation asynchronously. By additionally implementing a `MetricReporterFactory` your reporter can also be loaded as a plugin. ## Identifiers vs. tags diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java index a4e5edbffad..2dcf6fcd673 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java @@ -26,7 +26,8 @@ public interface Scheduled { /** * Report the current measurements. This method is called periodically by the metrics registry - * that uses the reporter. + * that uses the reporter. This method must not block for a significant amount of time, any + * reporter needing more time should instead run the operation asynchronously. */ void report(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index ac0e4df3cf9..d05eec313f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -74,7 +74,8 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { private final Object lock = new Object(); private final List<ReporterAndSettings> reporters; - private final ScheduledExecutorService executor; + private final ScheduledExecutorService reporterScheduledExecutor; + private final ScheduledExecutorService viewUpdaterScheduledExecutor; private final ScopeFormats scopeFormats; private final char globalDelimiter; @@ -102,7 +103,9 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { config, reporterConfigurations, Executors.newSingleThreadScheduledExecutor( - new ExecutorThreadFactory("Flink-MetricRegistry"))); + new ExecutorThreadFactory("Flink-Metric-Reporter")), + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-Metric-View-Updater"))); } @VisibleForTesting @@ -110,6 +113,14 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations, ScheduledExecutorService scheduledExecutor) { + this(config, reporterConfigurations, scheduledExecutor, scheduledExecutor); + } + + MetricRegistryImpl( + MetricRegistryConfiguration config, + Collection<ReporterSetup> reporterConfigurations, + ScheduledExecutorService reporterScheduledExecutor, + ScheduledExecutorService viewUpdaterScheduledExecutor) { this.maximumFramesize = config.getQueryServiceMessageSizeLimit(); this.scopeFormats = config.getScopeFormats(); this.globalDelimiter = config.getDelimiter(); @@ -119,7 +130,8 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { // second, instantiate any custom configured reporters this.reporters = new ArrayList<>(4); - this.executor = scheduledExecutor; + this.reporterScheduledExecutor = reporterScheduledExecutor; + this.viewUpdaterScheduledExecutor = viewUpdaterScheduledExecutor; this.queryService = null; this.metricQueryServiceRpcService = null; @@ -145,7 +157,7 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { namedReporter, className); - executor.scheduleWithFixedDelay( + reporterScheduledExecutor.scheduleWithFixedDelay( new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period.toMillis(), period.toMillis(), @@ -347,11 +359,20 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { throwable))); } - final CompletableFuture<Void> executorShutdownFuture = + final CompletableFuture<Void> reporterExecutorShutdownFuture = + ExecutorUtils.nonBlockingShutdown( + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + reporterScheduledExecutor); + terminationFutures.add(reporterExecutorShutdownFuture); + + final CompletableFuture<Void> viewUpdaterExecutorShutdownFuture = ExecutorUtils.nonBlockingShutdown( - gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS, executor); + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + viewUpdaterScheduledExecutor); - terminationFutures.add(executorShutdownFuture); + terminationFutures.add(viewUpdaterExecutorShutdownFuture); FutureUtils.completeAll(terminationFutures) .whenComplete( @@ -403,7 +424,7 @@ public class MetricRegistryImpl implements MetricRegistry, AutoCloseableAsync { try { if (metric instanceof View) { if (viewUpdater == null) { - viewUpdater = new ViewUpdater(executor); + viewUpdater = new ViewUpdater(viewUpdaterScheduledExecutor); } viewUpdater.notifyOfAddedView((View) metric); }