This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 97237d0d86ef4610174ba8a2579341822ed8d21e Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Nov 24 12:11:21 2021 +0100 [FLINK-24687][table-common] Fix the Table Factory loading mechanism to tolerate NoClassDefFoundError. Added a test and converted FactoryUtil to use assertj. Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../apache/flink/testutils/ClassLoaderUtils.java | 48 +++- .../apache/flink/table/factories/FactoryUtil.java | 33 ++- .../flink/table/factories/ServiceLoaderUtil.java | 100 ++++++++ .../flink/table/factories/FactoryUtilTest.java | 275 +++++++++++++-------- 4 files changed, 340 insertions(+), 116 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java index 8207b2d..25f5ea1 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java @@ -34,21 +34,25 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; /** Utilities to create class loaders. */ public class ClassLoaderUtils { + public static URLClassLoader compileAndLoadJava(File root, String filename, String source) throws IOException { return withRoot(root).addClass(filename.replaceAll("\\.java", ""), source).build(); } - private static URLClassLoader createClassLoader(File root) throws MalformedURLException { - return new URLClassLoader( - new URL[] {root.toURI().toURL()}, Thread.currentThread().getContextClassLoader()); + private static URLClassLoader createClassLoader(File root, ClassLoader parent) + throws MalformedURLException { + return new URLClassLoader(new URL[] {root.toURI().toURL()}, parent); } private static void writeAndCompile(File root, String filename, String source) @@ -76,7 +80,14 @@ public class ClassLoaderUtils { private static int compileClass(File sourceFile) { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - return compiler.run(null, null, null, "-proc:none", sourceFile.getPath()); + return compiler.run( + null, + null, + null, + "-proc:none", + "-classpath", + sourceFile.getParent() + ":" + System.getProperty("java.class.path"), + sourceFile.getPath()); } public static URL[] getClasspathURLs() { @@ -96,16 +107,23 @@ public class ClassLoaderUtils { } } + /** + * Builder for a {@link ClassLoader} where you can add resources and compile java source code. + */ public static class ClassLoaderBuilder { private final File root; private final Map<String, String> classes; private final Map<String, String> resources; + private final Map<String, List<String>> services; + private ClassLoader parent; private ClassLoaderBuilder(File root) { this.root = root; - this.classes = new HashMap<>(); - this.resources = new HashMap<>(); + this.classes = new LinkedHashMap<>(); + this.resources = new LinkedHashMap<>(); + this.services = new HashMap<>(); + this.parent = Thread.currentThread().getContextClassLoader(); } public ClassLoaderBuilder addResource(String targetPath, String resource) { @@ -119,6 +137,11 @@ public class ClassLoaderUtils { return this; } + public ClassLoaderBuilder addService(String serviceClass, String implClass) { + services.computeIfAbsent(serviceClass, k -> new ArrayList<>()).add(implClass); + return this; + } + public ClassLoaderBuilder addClass(String className, String source) { String oldValue = classes.putIfAbsent(className, source); @@ -130,22 +153,33 @@ public class ClassLoaderUtils { return this; } + public ClassLoaderBuilder withParentClassLoader(ClassLoader classLoader) { + this.parent = classLoader; + return this; + } + public URLClassLoader build() throws IOException { for (Map.Entry<String, String> classInfo : classes.entrySet()) { writeAndCompile(root, createFileName(classInfo.getKey()), classInfo.getValue()); } + services.forEach( + (serviceClass, serviceImpls) -> + resources.putIfAbsent( + "META-INF/services/" + serviceClass, + String.join("\n", serviceImpls))); for (Map.Entry<String, String> resource : resources.entrySet()) { writeSourceFile(root, resource.getKey(), resource.getValue()); } - return createClassLoader(root); + return createClassLoader(root, parent); } private String createFileName(String className) { return className + ".java"; } } + // ------------------------------------------------------------------------ // Testing of objects not in the application class loader // ------------------------------------------------------------------------ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 4cb382f..5430b3a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -56,8 +56,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.ServiceConfigurationError; -import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -689,15 +687,28 @@ public final class FactoryUtil { } } - private static List<Factory> discoverFactories(ClassLoader classLoader) { - try { - final List<Factory> result = new LinkedList<>(); - ServiceLoader.load(Factory.class, classLoader).iterator().forEachRemaining(result::add); - return result; - } catch (ServiceConfigurationError e) { - LOG.error("Could not load service provider for factories.", e); - throw new TableException("Could not load service provider for factories.", e); - } + static List<Factory> discoverFactories(ClassLoader classLoader) { + final List<Factory> result = new LinkedList<>(); + ServiceLoaderUtil.load(Factory.class, classLoader) + .forEachRemaining( + loadResult -> { + if (loadResult.hasFailed()) { + if (loadResult.getError() instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a " + + Factory.class + + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", + loadResult.getError()); + // After logging, we just ignore this failure + return; + } + throw new TableException( + "Unexpected error when trying to load service provider for factories.", + loadResult.getError()); + } + result.add(loadResult.getService()); + }); + return result; } private static String stringifyOption(String key, String value) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java new file mode 100644 index 0000000..313ae5c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.ServiceLoader; + +/** This class contains utilities to deal with {@link ServiceLoader}. */ +class ServiceLoaderUtil { + + /** + * This method behaves similarly to {@link ServiceLoader#load(Class, ClassLoader)} and it also + * wraps the returned {@link Iterator} to iterate safely through the loaded services, eventually + * catching load failures like {@link NoClassDefFoundError}. + */ + static <T> Iterator<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader) { + return new SafeIterator<>(ServiceLoader.load(clazz, classLoader).iterator()); + } + + static class LoadResult<T> { + private final T service; + private final Throwable error; + + private LoadResult(T service, Throwable error) { + this.service = service; + this.error = error; + } + + private LoadResult(T service) { + this(service, null); + } + + private LoadResult(Throwable error) { + this(null, error); + } + + public boolean hasFailed() { + return error != null; + } + + public Throwable getError() { + return error; + } + + public T getService() { + return service; + } + } + + /** + * This iterator wraps {@link Iterator#hasNext()} and {@link Iterator#next()} in try-catch, and + * returns {@link LoadResult} to handle such failures. + */ + private static class SafeIterator<T> implements Iterator<LoadResult<T>> { + + private final Iterator<T> iterator; + + public SafeIterator(Iterator<T> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + try { + return iterator.hasNext(); + } catch (Throwable t) { + return true; + } + } + + @Override + public LoadResult<T> next() { + try { + if (iterator.hasNext()) { + return new LoadResult<>(iterator.next()); + } + } catch (Throwable t) { + return new LoadResult<>(t); + } + throw new NoSuchElementException(); + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index d0ab5e2..c33d24f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -31,11 +31,16 @@ import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSour import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock; import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock; import org.apache.flink.table.factories.utils.FactoryMocks; +import org.apache.flink.testutils.ClassLoaderUtils; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.assertj.core.api.AbstractThrowableAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -43,42 +48,38 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link FactoryUtil}. */ public class FactoryUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test public void testMissingConnector() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.remove("connector"), "Table options do not contain an option key 'connector' for discovering a connector."); - testError(options -> options.remove("connector")); } @Test public void testInvalidConnector() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.put("connector", "FAIL"), "Could not find any factory for identifier 'FAIL' that implements '" + DynamicTableFactory.class.getName() + "' in the classpath.\n\n" + "Available factory identifiers are:\n\n" + "conflicting\nsink-only\nsource-only\ntest\ntest-connector"); - testError(options -> options.put("connector", "FAIL")); } @Test public void testConflictingConnector() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.put("connector", TestConflictingDynamicTableFactory1.IDENTIFIER), "Multiple factories for identifier 'conflicting' that implement '" + DynamicTableFactory.class.getName() + "' found in the classpath.\n" @@ -88,62 +89,66 @@ public class FactoryUtilTest { + TestConflictingDynamicTableFactory1.class.getName() + "\n" + TestConflictingDynamicTableFactory2.class.getName()); - testError( - options -> - options.put("connector", TestConflictingDynamicTableFactory1.IDENTIFIER)); } @Test public void testMissingConnectorOption() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.remove("target"), "One or more required options are missing.\n\n" + "Missing required options are:\n\n" + "target"); - testError(options -> options.remove("target")); } @Test public void testInvalidConnectorOption() { - expectError("Invalid value for option 'buffer-size'."); - testError(options -> options.put("buffer-size", "FAIL")); + assertCreateTableSourceWithOptionModifier( + options -> options.put("buffer-size", "FAIL"), + "Invalid value for option 'buffer-size'."); } @Test public void testMissingFormat() { - expectError("Could not find required scan format 'value.format'."); - testError(options -> options.remove("value.format")); + assertCreateTableSourceWithOptionModifier( + options -> options.remove("value.format"), + "Could not find required scan format 'value.format'."); } @Test public void testInvalidFormat() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.put("value.format", "FAIL"), "Could not find any factory for identifier 'FAIL' that implements '" + DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" + "Available factory identifiers are:\n\n" + "test-format"); - testError(options -> options.put("value.format", "FAIL")); } @Test public void testMissingFormatOption() { - expectError("Error creating scan format 'test-format' in option space 'key.test-format.'."); - expectError( + assertCreateTableSourceWithOptionModifier( + options -> options.remove("key.test-format.delimiter"), "One or more required options are missing.\n\n" + "Missing required options are:\n\n" - + "delimiter"); - testError(options -> options.remove("key.test-format.delimiter")); + + "delimiter", + "Error creating scan format 'test-format' in option space 'key.test-format.'."); } @Test public void testInvalidFormatOption() { - expectError("Invalid value for option 'fail-on-missing'."); - testError(options -> options.put("key.test-format.fail-on-missing", "FAIL")); + assertCreateTableSourceWithOptionModifier( + options -> options.put("key.test-format.fail-on-missing", "FAIL"), + "Invalid value for option 'fail-on-missing'."); } @Test public void testSecretOption() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> { + options.remove("target"); + options.put("password", "123"); + }, "Table options are:\n" + "\n" + "'buffer-size'='1000'\n" @@ -155,16 +160,15 @@ public class FactoryUtilTest { + "'value.format'='test-format'\n" + "'value.test-format.delimiter'='|'\n" + "'value.test-format.fail-on-missing'='true'"); - testError( - options -> { - options.remove("target"); - options.put("password", "123"); - }); } @Test public void testUnconsumedOption() { - expectError( + assertCreateTableSourceWithOptionModifier( + options -> { + options.put("this-is-not-consumed", "42"); + options.put("this-is-also-not-consumed", "true"); + }, "Unsupported options found for 'test-connector'.\n\n" + "Unsupported options:\n\n" + "this-is-also-not-consumed\n" @@ -192,11 +196,6 @@ public class FactoryUtilTest { + "value.test-format.fail-on-missing\n" + "value.test-format.fallback-fail-on-missing\n" + "value.test-format.readable-metadata"); - testError( - options -> { - options.put("this-is-not-consumed", "42"); - options.put("this-is-also-not-consumed", "true"); - }); } @Test @@ -208,7 +207,7 @@ public class FactoryUtilTest { "MyTarget", new DecodingFormatMock(",", false), new DecodingFormatMock("|", true)); - assertEquals(expectedSource, actualSource); + assertThat(actualSource).isEqualTo(expectedSource); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); final DynamicTableSink expectedSink = new DynamicTableSinkMock( @@ -216,7 +215,7 @@ public class FactoryUtilTest { 1000L, new EncodingFormatMock(","), new EncodingFormatMock("|")); - assertEquals(expectedSink, actualSink); + assertThat(actualSink).isEqualTo(expectedSink); } @Test @@ -232,7 +231,7 @@ public class FactoryUtilTest { "MyTarget", new DecodingFormatMock(",", false), new DecodingFormatMock("|", true)); - assertEquals(expectedSource, actualSource); + assertThat(actualSource).isEqualTo(expectedSource); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); final DynamicTableSink expectedSink = @@ -241,7 +240,7 @@ public class FactoryUtilTest { 1000L, new EncodingFormatMock(","), new EncodingFormatMock("|")); - assertEquals(expectedSink, actualSink); + assertThat(actualSink).isEqualTo(expectedSink); } @Test @@ -252,11 +251,11 @@ public class FactoryUtilTest { final DynamicTableSource actualSource = createTableSource(SCHEMA, options); final DynamicTableSource expectedSource = new DynamicTableSourceMock("MyTarget", null, new DecodingFormatMock("|", true)); - assertEquals(expectedSource, actualSource); + assertThat(actualSource).isEqualTo(expectedSource); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); final DynamicTableSink expectedSink = new DynamicTableSinkMock("MyTarget", 1000L, null, new EncodingFormatMock("|")); - assertEquals(expectedSink, actualSink); + assertThat(actualSink).isEqualTo(expectedSink); } @Test @@ -274,7 +273,7 @@ public class FactoryUtilTest { "MyTarget", new DecodingFormatMock(",", false), new DecodingFormatMock(";", true)); - assertEquals(expectedSource, actualSource); + assertThat(actualSource).isEqualTo(expectedSource); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); final DynamicTableSink expectedSink = new DynamicTableSinkMock( @@ -282,28 +281,29 @@ public class FactoryUtilTest { 1000L, new EncodingFormatMock(","), new EncodingFormatMock(";")); - assertEquals(expectedSink, actualSink); + assertThat(actualSink).isEqualTo(expectedSink); } @Test public void testConnectorErrorHint() { - try { - createTableSource(SCHEMA, Collections.singletonMap("connector", "sink-only")); - fail(); - } catch (Exception e) { - String errorMsg = - "Connector 'sink-only' can only be used as a sink. It cannot be used as a source."; - assertThat(e, containsCause(new ValidationException(errorMsg))); - } - - try { - createTableSink(SCHEMA, Collections.singletonMap("connector", "source-only")); - fail(); - } catch (Exception e) { - String errorMsg = - "Connector 'source-only' can only be used as a source. It cannot be used as a sink."; - assertThat(e, containsCause(new ValidationException(errorMsg))); - } + assertThatThrownBy( + () -> + createTableSource( + SCHEMA, Collections.singletonMap("connector", "sink-only"))) + .satisfies( + anyCauseMatches( + ValidationException.class, + "Connector 'sink-only' can only be used as a sink. It cannot be used as a source.")); + + assertThatThrownBy( + () -> + createTableSink( + SCHEMA, + Collections.singletonMap("connector", "source-only"))) + .satisfies( + anyCauseMatches( + ValidationException.class, + "Connector 'source-only' can only be used as a source. It cannot be used as a sink.")); } @Test @@ -331,13 +331,12 @@ public class FactoryUtilTest { options, null, Thread.currentThread().getContextClassLoader()); - assertTrue(catalog instanceof TestCatalogFactory.TestCatalog); + assertThat(catalog).isInstanceOf(TestCatalogFactory.TestCatalog.class); final TestCatalogFactory.TestCatalog testCatalog = (TestCatalogFactory.TestCatalog) catalog; - assertEquals(testCatalog.getName(), "my-catalog"); - assertEquals( - testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()), - "my-database"); + assertThat("my-catalog").isEqualTo(testCatalog.getName()); + assertThat("my-database") + .isEqualTo(testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key())); } @Test @@ -363,8 +362,11 @@ public class FactoryUtilTest { null, Thread.currentThread().getContextClassLoader())); - expectError("Unsupported options found for 'test-catalog'"); - helper2.validate(); + assertThatThrownBy(helper2::validate) + .satisfies( + anyCauseMatches( + ValidationException.class, + "Unsupported options found for 'test-catalog'")); } @Test @@ -400,17 +402,6 @@ public class FactoryUtilTest { @Test public void testInvalidFactoryHelperWithMapOption() { - expectError( - "Unsupported options found for 'test-factory-with-map'.\n\n" - + "Unsupported options:\n\n" - + "unknown\n\n" - + "Supported options:\n\n" - + "connector\n" - + "properties\n" - + "properties.prop-1\n" - + "properties.prop-2\n" - + "property-version"); - final Map<String, String> options = new HashMap<>(); options.put("properties.prop-1", "value-1"); options.put("properties.prop-2", "value-2"); @@ -419,22 +410,110 @@ public class FactoryUtilTest { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper( new TestFactoryWithMap(), FactoryMocks.createTableContext(SCHEMA, options)); - helper.validate(); + + assertThatThrownBy(helper::validate) + .satisfies( + anyCauseMatches( + ValidationException.class, + "Unsupported options found for 'test-factory-with-map'.\n\n" + + "Unsupported options:\n\n" + + "unknown\n\n" + + "Supported options:\n\n" + + "connector\n" + + "properties\n" + + "properties.prop-1\n" + + "properties.prop-2\n" + + "property-version")); + } + + @Test + public void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws IOException { + // Let's prepare the classloader with a factory interface and 2 classes, one implements our + // sub-interface of SerializationFormatFactory and the other implements only + // SerializationFormatFactory. + final String subInterfaceName = "MyFancySerializationSchemaFormat"; + final String subInterfaceImplementationName = "MyFancySerializationSchemaFormatImpl"; + final String serializationSchemaImplementationName = "AnotherSerializationSchema"; + + final URLClassLoader classLoaderIncludingTheInterface = + ClassLoaderUtils.withRoot(tempDir.toFile()) + .addClass( + subInterfaceName, + "public interface " + + subInterfaceName + + " extends " + + SerializationFormatFactory.class.getName() + + " {}") + .addClass( + subInterfaceImplementationName, + "import org.apache.flink.api.common.serialization.SerializationSchema;" + + "import org.apache.flink.configuration.ConfigOption;" + + "import org.apache.flink.configuration.ReadableConfig;" + + "import org.apache.flink.table.connector.format.EncodingFormat;" + + "import org.apache.flink.table.data.RowData;" + + "import org.apache.flink.table.factories.DynamicTableFactory;" + + "import org.apache.flink.table.factories.SerializationFormatFactory;" + + "import java.util.Set;" + + "public class " + + subInterfaceImplementationName + + " implements " + + subInterfaceName + + " {" + + "@Override public String factoryIdentifier() { return null; }" + + "@Override public Set<ConfigOption<?>> requiredOptions() { return null; }" + + "@Override public Set<ConfigOption<?>> optionalOptions() { return null; }" + + "@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }" + + "}") + .addClass( + serializationSchemaImplementationName, + "import org.apache.flink.api.common.serialization.SerializationSchema;" + + "import org.apache.flink.configuration.ConfigOption;" + + "import org.apache.flink.configuration.ReadableConfig;" + + "import org.apache.flink.table.connector.format.EncodingFormat;" + + "import org.apache.flink.table.data.RowData;" + + "import org.apache.flink.table.factories.DynamicTableFactory;" + + "import org.apache.flink.table.factories.SerializationFormatFactory;" + + "import java.util.Set;" + + "public class " + + serializationSchemaImplementationName + + " implements " + + SerializationFormatFactory.class.getName() + + " {" + + "@Override public String factoryIdentifier() { return null; }" + + "@Override public Set<ConfigOption<?>> requiredOptions() { return null; }" + + "@Override public Set<ConfigOption<?>> optionalOptions() { return null; }" + + "@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }" + + "}") + .addService(Factory.class.getName(), subInterfaceImplementationName) + .addService(Factory.class.getName(), serializationSchemaImplementationName) + .build(); + + // Delete the sub interface now, so it can't be loaded + Files.delete(tempDir.resolve(subInterfaceName + ".class")); + + assertThat(FactoryUtil.discoverFactories(classLoaderIncludingTheInterface)) + .map(f -> f.getClass().getName()) + .doesNotContain(subInterfaceImplementationName) + .contains(serializationSchemaImplementationName); } // -------------------------------------------------------------------------------------------- // Helper methods // -------------------------------------------------------------------------------------------- - private void expectError(String message) { - thrown.expect(ValidationException.class); - thrown.expect(containsMessage(message)); - } - - private static void testError(Consumer<Map<String, String>> optionModifier) { - final Map<String, String> options = createAllOptions(); - optionModifier.accept(options); - createTableSource(SCHEMA, options); + private static void assertCreateTableSourceWithOptionModifier( + Consumer<Map<String, String>> optionModifier, String... messages) { + AbstractThrowableAssert<?, ? extends Throwable> assertion = + assertThatThrownBy( + () -> { + final Map<String, String> options = createAllOptions(); + optionModifier.accept(options); + createTableSource(SCHEMA, options); + }); + + for (String message : messages) { + assertion.satisfies(anyCauseMatches(ValidationException.class, message)); + } } private static Map<String, String> createAllOptions() {