This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b51ee30c3d1 [FLINK-33022][runtime] Log an error when enrichers defined 
as part of the configuration can not be found/loaded
b51ee30c3d1 is described below

commit b51ee30c3d1a212947398d880a676f07f46f36be
Author: Panagiotis Garefalakis <pga...@apache.org>
AuthorDate: Sat Sep 9 13:54:43 2023 -0700

    [FLINK-33022][runtime] Log an error when enrichers defined as part of the 
configuration can not be found/loaded
---
 .../flink/runtime/failure/FailureEnricherUtils.java       | 14 ++++++++++----
 .../flink/runtime/failure/FailureEnricherUtilsTest.java   | 15 ++++++++++++++-
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index d9b4c2278df..0e19573d8ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -75,9 +75,9 @@ public class FailureEnricherUtils {
     @VisibleForTesting
     static Collection<FailureEnricher> getFailureEnrichers(
             final Configuration configuration, final PluginManager 
pluginManager) {
-        Set<String> includedEnrichers = 
getIncludedFailureEnrichers(configuration);
+        final Set<String> enrichersToLoad = 
getIncludedFailureEnrichers(configuration);
         //  When empty, NO enrichers will be started.
-        if (includedEnrichers.isEmpty()) {
+        if (enrichersToLoad.isEmpty()) {
             return Collections.emptySet();
         }
         final Iterator<FailureEnricherFactory> factoryIterator =
@@ -87,7 +87,7 @@ public class FailureEnricherUtils {
             final FailureEnricherFactory failureEnricherFactory = 
factoryIterator.next();
             final FailureEnricher failureEnricher =
                     
failureEnricherFactory.createFailureEnricher(configuration);
-            if 
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
+            if (enrichersToLoad.remove(failureEnricher.getClass().getName())) {
                 failureEnrichers.add(failureEnricher);
                 LOG.info(
                         "Found failure enricher {} at {}.",
@@ -102,10 +102,16 @@ public class FailureEnricherUtils {
                 LOG.debug(
                         "Excluding failure enricher {}, not configured in 
enricher list ({}).",
                         failureEnricherFactory.getClass().getName(),
-                        includedEnrichers);
+                        enrichersToLoad);
             }
         }
 
+        if (!enrichersToLoad.isEmpty()) {
+            LOG.error(
+                    "The following failure enrichers were configured but not 
found on the classpath: {}.",
+                    enrichersToLoad);
+        }
+
         return filterInvalidEnrichers(failureEnrichers);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
index d5b6ef334d3..7d1cb80e401 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -108,7 +108,20 @@ class FailureEnricherUtilsTest {
                 FailureEnricherUtils.getFailureEnrichers(configuration, 
createPluginManager());
         assertThat(enrichers).hasSize(1);
         // verify that the failure enricher was created and returned
-        
assertThat(enrichers.iterator().next()).isInstanceOf(TestEnricher.class);
+        assertThat(enrichers)
+                .satisfiesExactly(
+                        enricher -> 
assertThat(enricher).isInstanceOf(TestEnricher.class));
+
+        // Valid plus Invalid Name combination
+        configuration.set(
+                JobManagerOptions.FAILURE_ENRICHERS_LIST,
+                FailureEnricherUtilsTest.class.getName() + "," + 
TestEnricher.class.getName());
+        final Collection<FailureEnricher> validInvalidEnrichers =
+                FailureEnricherUtils.getFailureEnrichers(configuration, 
createPluginManager());
+        assertThat(validInvalidEnrichers).hasSize(1);
+        assertThat(validInvalidEnrichers)
+                .satisfiesExactly(
+                        enricher -> 
assertThat(enricher).isInstanceOf(TestEnricher.class));
     }
 
     @Test

Reply via email to