zentol commented on a change in pull request #11195: [FLINK-16222][runtime] Use 
plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386479537
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##########
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
        }
 
        private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-               LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+               LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
                reporter.open(metricConfig);
 
                return new ReporterSetup(reporterName, metricConfig, reporter);
        }
 
-       public static List<ReporterSetup> fromConfiguration(final Configuration 
configuration) {
+       public static List<ReporterSetup> fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+               LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
                String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-               Set<String> includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
-                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
-                       .collect(Collectors.toSet());
 
-               // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
-               Set<String> namedReporters = new TreeSet<>(String::compareTo);
-               // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
-               for (String key : configuration.keySet()) {
-                       if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
-                               Matcher matcher = 
reporterClassPattern.matcher(key);
-                               if (matcher.matches()) {
-                                       String reporterName = matcher.group(1);
-                                       if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
-                                               if 
(namedReporters.contains(reporterName)) {
-                                                       LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
-                                               } else {
-                                                       
namedReporters.add(reporterName);
-                                               }
-                                       } else {
-                                               LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
-                                       }
-                               }
-                       }
-               }
+               Set<String> namedReporters = 
findEnabledReportersInConfiguration(configuration,
+                       includedReportersString);
 
                if (namedReporters.isEmpty()) {
                        return Collections.emptyList();
                }
 
-               List<Tuple2<String, Configuration>> reporterConfigurations = 
new ArrayList<>(namedReporters.size());
+               final Map<String, MetricReporterFactory> reporterFactories = 
loadAvailableReporterFactories(pluginManager);
+               LOG.debug("Loaded Reporter Factories: {}", reporterFactories);
+               final List<Tuple2<String, Configuration>> 
reporterConfigurations = loadReporterConfigurations(configuration, 
namedReporters);
+               LOG.debug("Loaded Reporter Configurations: {}", 
reporterConfigurations);
 
-               for (String namedReporter: namedReporters) {
-                       DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
-                               configuration,
-                               ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+               List<ReporterSetup> reporterSetups = 
setupReporters(reporterFactories, reporterConfigurations);
+               LOG.debug("All initialized Reporters:");
+               reporterSetups.forEach(i -> LOG.debug("{} - {}", i.getName(), 
i.getConfiguration()));
 
 Review comment:
   same as above about leaking sensitive information

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to