zentol commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386479537
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ########## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List<ReporterSetup> fromConfiguration(final Configuration configuration) { + public static List<ReporterSetup> fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString) - .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ - .collect(Collectors.toSet()); - // use a TreeSet to make the reporter order deterministic, which is useful for testing - Set<String> namedReporters = new TreeSet<>(String::compareTo); - // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations - for (String key : configuration.keySet()) { - if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { - Matcher matcher = reporterClassPattern.matcher(key); - if (matcher.matches()) { - String reporterName = matcher.group(1); - if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { - if (namedReporters.contains(reporterName)) { - LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); - } else { - namedReporters.add(reporterName); - } - } else { - LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); - } - } - } - } + Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, + includedReportersString); if (namedReporters.isEmpty()) { return Collections.emptyList(); } - List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size()); + final Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager); + LOG.debug("Loaded Reporter Factories: {}", reporterFactories); + final List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters); + LOG.debug("Loaded Reporter Configurations: {}", reporterConfigurations); - for (String namedReporter: namedReporters) { - DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( - configuration, - ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + List<ReporterSetup> reporterSetups = setupReporters(reporterFactories, reporterConfigurations); + LOG.debug("All initialized Reporters:"); + reporterSetups.forEach(i -> LOG.debug("{} - {}", i.getName(), i.getConfiguration())); Review comment: same as above about leaking sensitive information ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services