This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 97237d0d86ef4610174ba8a2579341822ed8d21e
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Nov 24 12:11:21 2021 +0100

    [FLINK-24687][table-common] Fix the Table Factory loading mechanism to 
tolerate NoClassDefFoundError. Added a test and converted FactoryUtil to use 
assertj.
    
    Signed-off-by: slinkydeveloper <francescogu...@gmail.com>
---
 .../apache/flink/testutils/ClassLoaderUtils.java   |  48 +++-
 .../apache/flink/table/factories/FactoryUtil.java  |  33 ++-
 .../flink/table/factories/ServiceLoaderUtil.java   | 100 ++++++++
 .../flink/table/factories/FactoryUtilTest.java     | 275 +++++++++++++--------
 4 files changed, 340 insertions(+), 116 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
index 8207b2d..25f5ea1 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -34,21 +34,25 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 /** Utilities to create class loaders. */
 public class ClassLoaderUtils {
+
     public static URLClassLoader compileAndLoadJava(File root, String 
filename, String source)
             throws IOException {
         return withRoot(root).addClass(filename.replaceAll("\\.java", ""), 
source).build();
     }
 
-    private static URLClassLoader createClassLoader(File root) throws 
MalformedURLException {
-        return new URLClassLoader(
-                new URL[] {root.toURI().toURL()}, 
Thread.currentThread().getContextClassLoader());
+    private static URLClassLoader createClassLoader(File root, ClassLoader 
parent)
+            throws MalformedURLException {
+        return new URLClassLoader(new URL[] {root.toURI().toURL()}, parent);
     }
 
     private static void writeAndCompile(File root, String filename, String 
source)
@@ -76,7 +80,14 @@ public class ClassLoaderUtils {
 
     private static int compileClass(File sourceFile) {
         JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-        return compiler.run(null, null, null, "-proc:none", 
sourceFile.getPath());
+        return compiler.run(
+                null,
+                null,
+                null,
+                "-proc:none",
+                "-classpath",
+                sourceFile.getParent() + ":" + 
System.getProperty("java.class.path"),
+                sourceFile.getPath());
     }
 
     public static URL[] getClasspathURLs() {
@@ -96,16 +107,23 @@ public class ClassLoaderUtils {
         }
     }
 
+    /**
+     * Builder for a {@link ClassLoader} where you can add resources and 
compile java source code.
+     */
     public static class ClassLoaderBuilder {
 
         private final File root;
         private final Map<String, String> classes;
         private final Map<String, String> resources;
+        private final Map<String, List<String>> services;
+        private ClassLoader parent;
 
         private ClassLoaderBuilder(File root) {
             this.root = root;
-            this.classes = new HashMap<>();
-            this.resources = new HashMap<>();
+            this.classes = new LinkedHashMap<>();
+            this.resources = new LinkedHashMap<>();
+            this.services = new HashMap<>();
+            this.parent = Thread.currentThread().getContextClassLoader();
         }
 
         public ClassLoaderBuilder addResource(String targetPath, String 
resource) {
@@ -119,6 +137,11 @@ public class ClassLoaderUtils {
             return this;
         }
 
+        public ClassLoaderBuilder addService(String serviceClass, String 
implClass) {
+            services.computeIfAbsent(serviceClass, k -> new 
ArrayList<>()).add(implClass);
+            return this;
+        }
+
         public ClassLoaderBuilder addClass(String className, String source) {
             String oldValue = classes.putIfAbsent(className, source);
 
@@ -130,22 +153,33 @@ public class ClassLoaderUtils {
             return this;
         }
 
+        public ClassLoaderBuilder withParentClassLoader(ClassLoader 
classLoader) {
+            this.parent = classLoader;
+            return this;
+        }
+
         public URLClassLoader build() throws IOException {
             for (Map.Entry<String, String> classInfo : classes.entrySet()) {
                 writeAndCompile(root, createFileName(classInfo.getKey()), 
classInfo.getValue());
             }
 
+            services.forEach(
+                    (serviceClass, serviceImpls) ->
+                            resources.putIfAbsent(
+                                    "META-INF/services/" + serviceClass,
+                                    String.join("\n", serviceImpls)));
             for (Map.Entry<String, String> resource : resources.entrySet()) {
                 writeSourceFile(root, resource.getKey(), resource.getValue());
             }
 
-            return createClassLoader(root);
+            return createClassLoader(root, parent);
         }
 
         private String createFileName(String className) {
             return className + ".java";
         }
     }
+
     // ------------------------------------------------------------------------
     //  Testing of objects not in the application class loader
     // ------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 4cb382f..5430b3a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -56,8 +56,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -689,15 +687,28 @@ public final class FactoryUtil {
         }
     }
 
-    private static List<Factory> discoverFactories(ClassLoader classLoader) {
-        try {
-            final List<Factory> result = new LinkedList<>();
-            ServiceLoader.load(Factory.class, 
classLoader).iterator().forEachRemaining(result::add);
-            return result;
-        } catch (ServiceConfigurationError e) {
-            LOG.error("Could not load service provider for factories.", e);
-            throw new TableException("Could not load service provider for 
factories.", e);
-        }
+    static List<Factory> discoverFactories(ClassLoader classLoader) {
+        final List<Factory> result = new LinkedList<>();
+        ServiceLoaderUtil.load(Factory.class, classLoader)
+                .forEachRemaining(
+                        loadResult -> {
+                            if (loadResult.hasFailed()) {
+                                if (loadResult.getError() instanceof 
NoClassDefFoundError) {
+                                    LOG.debug(
+                                            "NoClassDefFoundError when loading 
a "
+                                                    + Factory.class
+                                                    + ". This is expected when 
trying to load a format dependency but no flink-connector-files is loaded.",
+                                            loadResult.getError());
+                                    // After logging, we just ignore this 
failure
+                                    return;
+                                }
+                                throw new TableException(
+                                        "Unexpected error when trying to load 
service provider for factories.",
+                                        loadResult.getError());
+                            }
+                            result.add(loadResult.getService());
+                        });
+        return result;
     }
 
     private static String stringifyOption(String key, String value) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
new file mode 100644
index 0000000..313ae5c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.ServiceLoader;
+
+/** This class contains utilities to deal with {@link ServiceLoader}. */
+class ServiceLoaderUtil {
+
+    /**
+     * This method behaves similarly to {@link ServiceLoader#load(Class, 
ClassLoader)} and it also
+     * wraps the returned {@link Iterator} to iterate safely through the 
loaded services, eventually
+     * catching load failures like {@link NoClassDefFoundError}.
+     */
+    static <T> Iterator<LoadResult<T>> load(Class<T> clazz, ClassLoader 
classLoader) {
+        return new SafeIterator<>(ServiceLoader.load(clazz, 
classLoader).iterator());
+    }
+
+    static class LoadResult<T> {
+        private final T service;
+        private final Throwable error;
+
+        private LoadResult(T service, Throwable error) {
+            this.service = service;
+            this.error = error;
+        }
+
+        private LoadResult(T service) {
+            this(service, null);
+        }
+
+        private LoadResult(Throwable error) {
+            this(null, error);
+        }
+
+        public boolean hasFailed() {
+            return error != null;
+        }
+
+        public Throwable getError() {
+            return error;
+        }
+
+        public T getService() {
+            return service;
+        }
+    }
+
+    /**
+     * This iterator wraps {@link Iterator#hasNext()} and {@link 
Iterator#next()} in try-catch, and
+     * returns {@link LoadResult} to handle such failures.
+     */
+    private static class SafeIterator<T> implements Iterator<LoadResult<T>> {
+
+        private final Iterator<T> iterator;
+
+        public SafeIterator(Iterator<T> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                return iterator.hasNext();
+            } catch (Throwable t) {
+                return true;
+            }
+        }
+
+        @Override
+        public LoadResult<T> next() {
+            try {
+                if (iterator.hasNext()) {
+                    return new LoadResult<>(iterator.next());
+                }
+            } catch (Throwable t) {
+                return new LoadResult<>(t);
+            }
+            throw new NoSuchElementException();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index d0ab5e2..c33d24f 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -31,11 +31,16 @@ import 
org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSour
 import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock;
 import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock;
 import org.apache.flink.table.factories.utils.FactoryMocks;
+import org.apache.flink.testutils.ClassLoaderUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,42 +48,38 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link FactoryUtil}. */
 public class FactoryUtilTest {
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Test
     public void testMissingConnector() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.remove("connector"),
                 "Table options do not contain an option key 'connector' for 
discovering a connector.");
-        testError(options -> options.remove("connector"));
     }
 
     @Test
     public void testInvalidConnector() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.put("connector", "FAIL"),
                 "Could not find any factory for identifier 'FAIL' that 
implements '"
                         + DynamicTableFactory.class.getName()
                         + "' in the classpath.\n\n"
                         + "Available factory identifiers are:\n\n"
                         + 
"conflicting\nsink-only\nsource-only\ntest\ntest-connector");
-        testError(options -> options.put("connector", "FAIL"));
     }
 
     @Test
     public void testConflictingConnector() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.put("connector", 
TestConflictingDynamicTableFactory1.IDENTIFIER),
                 "Multiple factories for identifier 'conflicting' that 
implement '"
                         + DynamicTableFactory.class.getName()
                         + "' found in the classpath.\n"
@@ -88,62 +89,66 @@ public class FactoryUtilTest {
                         + TestConflictingDynamicTableFactory1.class.getName()
                         + "\n"
                         + TestConflictingDynamicTableFactory2.class.getName());
-        testError(
-                options ->
-                        options.put("connector", 
TestConflictingDynamicTableFactory1.IDENTIFIER));
     }
 
     @Test
     public void testMissingConnectorOption() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.remove("target"),
                 "One or more required options are missing.\n\n"
                         + "Missing required options are:\n\n"
                         + "target");
-        testError(options -> options.remove("target"));
     }
 
     @Test
     public void testInvalidConnectorOption() {
-        expectError("Invalid value for option 'buffer-size'.");
-        testError(options -> options.put("buffer-size", "FAIL"));
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.put("buffer-size", "FAIL"),
+                "Invalid value for option 'buffer-size'.");
     }
 
     @Test
     public void testMissingFormat() {
-        expectError("Could not find required scan format 'value.format'.");
-        testError(options -> options.remove("value.format"));
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.remove("value.format"),
+                "Could not find required scan format 'value.format'.");
     }
 
     @Test
     public void testInvalidFormat() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.put("value.format", "FAIL"),
                 "Could not find any factory for identifier 'FAIL' that 
implements '"
                         + DeserializationFormatFactory.class.getName()
                         + "' in the classpath.\n\n"
                         + "Available factory identifiers are:\n\n"
                         + "test-format");
-        testError(options -> options.put("value.format", "FAIL"));
     }
 
     @Test
     public void testMissingFormatOption() {
-        expectError("Error creating scan format 'test-format' in option space 
'key.test-format.'.");
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.remove("key.test-format.delimiter"),
                 "One or more required options are missing.\n\n"
                         + "Missing required options are:\n\n"
-                        + "delimiter");
-        testError(options -> options.remove("key.test-format.delimiter"));
+                        + "delimiter",
+                "Error creating scan format 'test-format' in option space 
'key.test-format.'.");
     }
 
     @Test
     public void testInvalidFormatOption() {
-        expectError("Invalid value for option 'fail-on-missing'.");
-        testError(options -> options.put("key.test-format.fail-on-missing", 
"FAIL"));
+        assertCreateTableSourceWithOptionModifier(
+                options -> options.put("key.test-format.fail-on-missing", 
"FAIL"),
+                "Invalid value for option 'fail-on-missing'.");
     }
 
     @Test
     public void testSecretOption() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> {
+                    options.remove("target");
+                    options.put("password", "123");
+                },
                 "Table options are:\n"
                         + "\n"
                         + "'buffer-size'='1000'\n"
@@ -155,16 +160,15 @@ public class FactoryUtilTest {
                         + "'value.format'='test-format'\n"
                         + "'value.test-format.delimiter'='|'\n"
                         + "'value.test-format.fail-on-missing'='true'");
-        testError(
-                options -> {
-                    options.remove("target");
-                    options.put("password", "123");
-                });
     }
 
     @Test
     public void testUnconsumedOption() {
-        expectError(
+        assertCreateTableSourceWithOptionModifier(
+                options -> {
+                    options.put("this-is-not-consumed", "42");
+                    options.put("this-is-also-not-consumed", "true");
+                },
                 "Unsupported options found for 'test-connector'.\n\n"
                         + "Unsupported options:\n\n"
                         + "this-is-also-not-consumed\n"
@@ -192,11 +196,6 @@ public class FactoryUtilTest {
                         + "value.test-format.fail-on-missing\n"
                         + "value.test-format.fallback-fail-on-missing\n"
                         + "value.test-format.readable-metadata");
-        testError(
-                options -> {
-                    options.put("this-is-not-consumed", "42");
-                    options.put("this-is-also-not-consumed", "true");
-                });
     }
 
     @Test
@@ -208,7 +207,7 @@ public class FactoryUtilTest {
                         "MyTarget",
                         new DecodingFormatMock(",", false),
                         new DecodingFormatMock("|", true));
-        assertEquals(expectedSource, actualSource);
+        assertThat(actualSource).isEqualTo(expectedSource);
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
         final DynamicTableSink expectedSink =
                 new DynamicTableSinkMock(
@@ -216,7 +215,7 @@ public class FactoryUtilTest {
                         1000L,
                         new EncodingFormatMock(","),
                         new EncodingFormatMock("|"));
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
     }
 
     @Test
@@ -232,7 +231,7 @@ public class FactoryUtilTest {
                         "MyTarget",
                         new DecodingFormatMock(",", false),
                         new DecodingFormatMock("|", true));
-        assertEquals(expectedSource, actualSource);
+        assertThat(actualSource).isEqualTo(expectedSource);
 
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
         final DynamicTableSink expectedSink =
@@ -241,7 +240,7 @@ public class FactoryUtilTest {
                         1000L,
                         new EncodingFormatMock(","),
                         new EncodingFormatMock("|"));
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
     }
 
     @Test
@@ -252,11 +251,11 @@ public class FactoryUtilTest {
         final DynamicTableSource actualSource = createTableSource(SCHEMA, 
options);
         final DynamicTableSource expectedSource =
                 new DynamicTableSourceMock("MyTarget", null, new 
DecodingFormatMock("|", true));
-        assertEquals(expectedSource, actualSource);
+        assertThat(actualSource).isEqualTo(expectedSource);
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
         final DynamicTableSink expectedSink =
                 new DynamicTableSinkMock("MyTarget", 1000L, null, new 
EncodingFormatMock("|"));
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
     }
 
     @Test
@@ -274,7 +273,7 @@ public class FactoryUtilTest {
                         "MyTarget",
                         new DecodingFormatMock(",", false),
                         new DecodingFormatMock(";", true));
-        assertEquals(expectedSource, actualSource);
+        assertThat(actualSource).isEqualTo(expectedSource);
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
         final DynamicTableSink expectedSink =
                 new DynamicTableSinkMock(
@@ -282,28 +281,29 @@ public class FactoryUtilTest {
                         1000L,
                         new EncodingFormatMock(","),
                         new EncodingFormatMock(";"));
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
     }
 
     @Test
     public void testConnectorErrorHint() {
-        try {
-            createTableSource(SCHEMA, Collections.singletonMap("connector", 
"sink-only"));
-            fail();
-        } catch (Exception e) {
-            String errorMsg =
-                    "Connector 'sink-only' can only be used as a sink. It 
cannot be used as a source.";
-            assertThat(e, containsCause(new ValidationException(errorMsg)));
-        }
-
-        try {
-            createTableSink(SCHEMA, Collections.singletonMap("connector", 
"source-only"));
-            fail();
-        } catch (Exception e) {
-            String errorMsg =
-                    "Connector 'source-only' can only be used as a source. It 
cannot be used as a sink.";
-            assertThat(e, containsCause(new ValidationException(errorMsg)));
-        }
+        assertThatThrownBy(
+                        () ->
+                                createTableSource(
+                                        SCHEMA, 
Collections.singletonMap("connector", "sink-only")))
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Connector 'sink-only' can only be used as a 
sink. It cannot be used as a source."));
+
+        assertThatThrownBy(
+                        () ->
+                                createTableSink(
+                                        SCHEMA,
+                                        Collections.singletonMap("connector", 
"source-only")))
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Connector 'source-only' can only be used as a 
source. It cannot be used as a sink."));
     }
 
     @Test
@@ -331,13 +331,12 @@ public class FactoryUtilTest {
                         options,
                         null,
                         Thread.currentThread().getContextClassLoader());
-        assertTrue(catalog instanceof TestCatalogFactory.TestCatalog);
+        assertThat(catalog).isInstanceOf(TestCatalogFactory.TestCatalog.class);
 
         final TestCatalogFactory.TestCatalog testCatalog = 
(TestCatalogFactory.TestCatalog) catalog;
-        assertEquals(testCatalog.getName(), "my-catalog");
-        assertEquals(
-                
testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()),
-                "my-database");
+        assertThat("my-catalog").isEqualTo(testCatalog.getName());
+        assertThat("my-database")
+                
.isEqualTo(testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()));
     }
 
     @Test
@@ -363,8 +362,11 @@ public class FactoryUtilTest {
                                 null,
                                 
Thread.currentThread().getContextClassLoader()));
 
-        expectError("Unsupported options found for 'test-catalog'");
-        helper2.validate();
+        assertThatThrownBy(helper2::validate)
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Unsupported options found for 
'test-catalog'"));
     }
 
     @Test
@@ -400,17 +402,6 @@ public class FactoryUtilTest {
 
     @Test
     public void testInvalidFactoryHelperWithMapOption() {
-        expectError(
-                "Unsupported options found for 'test-factory-with-map'.\n\n"
-                        + "Unsupported options:\n\n"
-                        + "unknown\n\n"
-                        + "Supported options:\n\n"
-                        + "connector\n"
-                        + "properties\n"
-                        + "properties.prop-1\n"
-                        + "properties.prop-2\n"
-                        + "property-version");
-
         final Map<String, String> options = new HashMap<>();
         options.put("properties.prop-1", "value-1");
         options.put("properties.prop-2", "value-2");
@@ -419,22 +410,110 @@ public class FactoryUtilTest {
         final FactoryUtil.TableFactoryHelper helper =
                 FactoryUtil.createTableFactoryHelper(
                         new TestFactoryWithMap(), 
FactoryMocks.createTableContext(SCHEMA, options));
-        helper.validate();
+
+        assertThatThrownBy(helper::validate)
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Unsupported options found for 
'test-factory-with-map'.\n\n"
+                                        + "Unsupported options:\n\n"
+                                        + "unknown\n\n"
+                                        + "Supported options:\n\n"
+                                        + "connector\n"
+                                        + "properties\n"
+                                        + "properties.prop-1\n"
+                                        + "properties.prop-2\n"
+                                        + "property-version"));
+    }
+
+    @Test
+    public void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws 
IOException {
+        // Let's prepare the classloader with a factory interface and 2 
classes, one implements our
+        // sub-interface of SerializationFormatFactory and the other 
implements only
+        // SerializationFormatFactory.
+        final String subInterfaceName = "MyFancySerializationSchemaFormat";
+        final String subInterfaceImplementationName = 
"MyFancySerializationSchemaFormatImpl";
+        final String serializationSchemaImplementationName = 
"AnotherSerializationSchema";
+
+        final URLClassLoader classLoaderIncludingTheInterface =
+                ClassLoaderUtils.withRoot(tempDir.toFile())
+                        .addClass(
+                                subInterfaceName,
+                                "public interface "
+                                        + subInterfaceName
+                                        + " extends "
+                                        + 
SerializationFormatFactory.class.getName()
+                                        + " {}")
+                        .addClass(
+                                subInterfaceImplementationName,
+                                "import 
org.apache.flink.api.common.serialization.SerializationSchema;"
+                                        + "import 
org.apache.flink.configuration.ConfigOption;"
+                                        + "import 
org.apache.flink.configuration.ReadableConfig;"
+                                        + "import 
org.apache.flink.table.connector.format.EncodingFormat;"
+                                        + "import 
org.apache.flink.table.data.RowData;"
+                                        + "import 
org.apache.flink.table.factories.DynamicTableFactory;"
+                                        + "import 
org.apache.flink.table.factories.SerializationFormatFactory;"
+                                        + "import java.util.Set;"
+                                        + "public class "
+                                        + subInterfaceImplementationName
+                                        + " implements "
+                                        + subInterfaceName
+                                        + " {"
+                                        + "@Override public String 
factoryIdentifier() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> requiredOptions() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> optionalOptions() { return null; }"
+                                        + "@Override public 
EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig 
formatOptions) { return null; }"
+                                        + "}")
+                        .addClass(
+                                serializationSchemaImplementationName,
+                                "import 
org.apache.flink.api.common.serialization.SerializationSchema;"
+                                        + "import 
org.apache.flink.configuration.ConfigOption;"
+                                        + "import 
org.apache.flink.configuration.ReadableConfig;"
+                                        + "import 
org.apache.flink.table.connector.format.EncodingFormat;"
+                                        + "import 
org.apache.flink.table.data.RowData;"
+                                        + "import 
org.apache.flink.table.factories.DynamicTableFactory;"
+                                        + "import 
org.apache.flink.table.factories.SerializationFormatFactory;"
+                                        + "import java.util.Set;"
+                                        + "public class "
+                                        + serializationSchemaImplementationName
+                                        + " implements "
+                                        + 
SerializationFormatFactory.class.getName()
+                                        + " {"
+                                        + "@Override public String 
factoryIdentifier() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> requiredOptions() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> optionalOptions() { return null; }"
+                                        + "@Override public 
EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig 
formatOptions) { return null; }"
+                                        + "}")
+                        .addService(Factory.class.getName(), 
subInterfaceImplementationName)
+                        .addService(Factory.class.getName(), 
serializationSchemaImplementationName)
+                        .build();
+
+        // Delete the sub interface now, so it can't be loaded
+        Files.delete(tempDir.resolve(subInterfaceName + ".class"));
+
+        
assertThat(FactoryUtil.discoverFactories(classLoaderIncludingTheInterface))
+                .map(f -> f.getClass().getName())
+                .doesNotContain(subInterfaceImplementationName)
+                .contains(serializationSchemaImplementationName);
     }
 
     // 
--------------------------------------------------------------------------------------------
     // Helper methods
     // 
--------------------------------------------------------------------------------------------
 
-    private void expectError(String message) {
-        thrown.expect(ValidationException.class);
-        thrown.expect(containsMessage(message));
-    }
-
-    private static void testError(Consumer<Map<String, String>> 
optionModifier) {
-        final Map<String, String> options = createAllOptions();
-        optionModifier.accept(options);
-        createTableSource(SCHEMA, options);
+    private static void assertCreateTableSourceWithOptionModifier(
+            Consumer<Map<String, String>> optionModifier, String... messages) {
+        AbstractThrowableAssert<?, ? extends Throwable> assertion =
+                assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> options = 
createAllOptions();
+                            optionModifier.accept(options);
+                            createTableSource(SCHEMA, options);
+                        });
+
+        for (String message : messages) {
+            assertion.satisfies(anyCauseMatches(ValidationException.class, 
message));
+        }
     }
 
     private static Map<String, String> createAllOptions() {

Reply via email to