This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 67df120318d7b33b95bc85d69824e6f9f327ed64 Author: fengli <ldliu...@163.com> AuthorDate: Mon Aug 29 20:18:20 2022 +0800 [FLINK-29074][Connectors/JDBC] Fix ClassNotFound exception when using jdbc connector by add jar syntax This closes #20707 --- .../jdbc/catalog/AbstractJdbcCatalog.java | 23 ++++++++++---- .../flink/connector/jdbc/catalog/JdbcCatalog.java | 5 +-- .../connector/jdbc/catalog/JdbcCatalogUtils.java | 9 ++++-- .../flink/connector/jdbc/catalog/MySqlCatalog.java | 37 ++++++++++++++-------- .../connector/jdbc/catalog/PostgresCatalog.java | 3 +- .../jdbc/catalog/factory/JdbcCatalogFactory.java | 1 + .../connector/jdbc/dialect/JdbcDialectLoader.java | 6 ++-- .../internal/options/JdbcConnectorOptions.java | 19 ++++++++++- .../jdbc/table/JdbcDynamicTableFactory.java | 29 ++++++++++------- .../jdbc/table/JdbcRowDataLookupFunction.java | 4 +-- .../jdbc/catalog/MySqlCatalogTestBase.java | 1 + .../jdbc/catalog/PostgresCatalogTestBase.java | 1 + .../catalog/factory/JdbcCatalogFactoryTest.java | 1 + .../oracle/OraclePreparedStatementTest.java | 4 ++- .../FieldNamedPreparedStatementImplTest.java | 3 +- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 5 +++ 16 files changed, 103 insertions(+), 48 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index 8241085..482ce79 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -51,6 +51,7 @@ import org.apache.flink.table.factories.Factory; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.commons.compress.utils.Lists; import org.slf4j.Logger; @@ -79,18 +80,21 @@ import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAM import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** Abstract catalog for any JDBC catalogs. */ public abstract class AbstractJdbcCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); + protected final ClassLoader userClassLoader; protected final String username; protected final String pwd; protected final String baseUrl; protected final String defaultUrl; public AbstractJdbcCatalog( + ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, @@ -98,12 +102,14 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { String baseUrl) { super(catalogName, defaultDatabase); + checkNotNull(userClassLoader); checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); JdbcCatalogUtils.validateJdbcUrl(baseUrl); + this.userClassLoader = userClassLoader; this.username = username; this.pwd = pwd; this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; @@ -112,14 +118,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { @Override public void open() throws CatalogException { - // test connection, fail early if we cannot connect to database - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - } catch (SQLException e) { - throw new ValidationException( - String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(userClassLoader)) { + // test connection, fail early if we cannot connect to database + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + } + LOG.info("Catalog {} established connection to {}", getName(), defaultUrl); } - - LOG.info("Catalog {} established connection to {}", getName(), defaultUrl); } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java index 1592ef4..0a93171 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java @@ -41,16 +41,17 @@ public class JdbcCatalog extends AbstractJdbcCatalog { private final AbstractJdbcCatalog internal; public JdbcCatalog( + ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { - super(catalogName, defaultDatabase, username, pwd, baseUrl); + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); internal = JdbcCatalogUtils.createCatalog( - catalogName, defaultDatabase, username, pwd, baseUrl); + userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); } // ------ databases ----- diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index fa1af79..28bea80 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -39,17 +39,20 @@ public class JdbcCatalogUtils { /** Create catalog instance from given information. */ public static AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { - JdbcDialect dialect = JdbcDialectLoader.load(baseUrl); + JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader); if (dialect instanceof PostgresDialect) { - return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl); + return new PostgresCatalog( + userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); } else if (dialect instanceof MySqlDialect) { - return new MySqlCatalog(catalogName, defaultDatabase, username, pwd, baseUrl); + return new MySqlCatalog( + userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); } else { throw new UnsupportedOperationException( String.format("Catalog for '%s' is not supported yet.", dialect)); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java index de3dfe2..d88b228 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -60,12 +61,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog { }; public MySqlCatalog( + ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { - super(catalogName, defaultDatabase, username, pwd, baseUrl); + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null."); @@ -118,23 +120,30 @@ public class MySqlCatalog extends AbstractJdbcCatalog { } private String getDatabaseVersion() { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - return conn.getMetaData().getDatabaseProductVersion(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed in getting MySQL version by %s.", defaultUrl), e); + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(userClassLoader)) { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + return conn.getMetaData().getDatabaseProductVersion(); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed in getting MySQL version by %s.", defaultUrl), e); + } } } private String getDriverVersion() { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - String driverVersion = conn.getMetaData().getDriverVersion(); - Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+"); - Matcher matcher = regexp.matcher(driverVersion); - return matcher.find() ? matcher.group(0) : null; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed in getting MySQL driver version by %s.", defaultUrl), e); + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(userClassLoader)) { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + String driverVersion = conn.getMetaData().getDriverVersion(); + Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+"); + Matcher matcher = regexp.matcher(driverVersion); + return matcher.find() ? matcher.group(0) : null; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed in getting MySQL driver version by %s.", defaultUrl), + e); + } } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java index 5d767e5..de41c01 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java @@ -71,12 +71,13 @@ public class PostgresCatalog extends AbstractJdbcCatalog { private final JdbcDialectTypeMapper dialectTypeMapper; protected PostgresCatalog( + ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { - super(catalogName, defaultDatabase, username, pwd, baseUrl); + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); this.dialectTypeMapper = new PostgresTypeMapper(); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java index 2fb74f2..9677744 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java @@ -70,6 +70,7 @@ public class JdbcCatalogFactory implements CatalogFactory { helper.validate(); return new JdbcCatalog( + context.getClassLoader(), context.getName(), helper.getOptions().get(DEFAULT_DATABASE), helper.getOptions().get(USERNAME), diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java index fa9253c..6dfad9b 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java @@ -41,13 +41,13 @@ public final class JdbcDialectLoader { * Loads the unique JDBC Dialect that can handle the given database url. * * @param url A database URL. + * @param classLoader the classloader used to load the factory * @throws IllegalStateException if the loader cannot find exactly one dialect that can * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url) { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - List<JdbcDialectFactory> foundFactories = discoverFactories(cl); + public static JdbcDialect load(String url, ClassLoader classLoader) { + List<JdbcDialectFactory> foundFactories = discoverFactories(classLoader); if (foundFactories.isEmpty()) { throw new IllegalStateException( diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java index 89d22bd..0b2562a 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java @@ -105,6 +105,7 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions { /** Builder of {@link JdbcConnectorOptions}. */ public static class Builder { + private ClassLoader classLoader; private String dbURL; private String tableName; private String driverName; @@ -114,6 +115,19 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions { private Integer parallelism; private int connectionCheckTimeoutSeconds = 60; + /** + * optional, specifies the classloader to use in the planner for load the class in user jar. + * + * <p>By default, this is configured using {@code + * Thread.currentThread().getContextClassLoader()}. + * + * <p>Modify the {@link ClassLoader} only if you know what you're doing. + */ + public Builder setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + /** required, table name. */ public Builder setTableName(String tableName) { this.tableName = tableName; @@ -171,7 +185,10 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions { checkNotNull(dbURL, "No dbURL supplied."); checkNotNull(tableName, "No tableName supplied."); if (this.dialect == null) { - this.dialect = JdbcDialectLoader.load(dbURL); + if (classLoader == null) { + classLoader = Thread.currentThread().getContextClassLoader(); + } + this.dialect = JdbcDialectLoader.load(dbURL, classLoader); } if (this.driverName == null) { Optional<String> optional = dialect.defaultDriverName(); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java index 5760686..bff5a49 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -87,9 +87,10 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam final ReadableConfig config = helper.getOptions(); helper.validate(); - validateConfigOptions(config); - validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL)); - JdbcConnectorOptions jdbcOptions = getJdbcOptions(config); + validateConfigOptions(config, context.getClassLoader()); + validateDataTypeWithJdbcDialect( + context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); + JdbcConnectorOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader()); return new JdbcDynamicTableSink( jdbcOptions, @@ -108,28 +109,32 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam final ReadableConfig config = helper.getOptions(); helper.validate(); - validateConfigOptions(config); - validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL)); + validateConfigOptions(config, context.getClassLoader()); + validateDataTypeWithJdbcDialect( + context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); return new JdbcDynamicTableSource( - getJdbcOptions(helper.getOptions()), + getJdbcOptions(helper.getOptions(), context.getClassLoader()), getJdbcReadOptions(helper.getOptions()), helper.getOptions().get(LookupOptions.MAX_RETRIES), getLookupCache(config), context.getPhysicalRowDataType()); } - private static void validateDataTypeWithJdbcDialect(DataType dataType, String url) { - final JdbcDialect dialect = JdbcDialectLoader.load(url); + private static void validateDataTypeWithJdbcDialect( + DataType dataType, String url, ClassLoader classLoader) { + final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader); dialect.validate((RowType) dataType.getLogicalType()); } - private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) { + private JdbcConnectorOptions getJdbcOptions( + ReadableConfig readableConfig, ClassLoader classLoader) { final String url = readableConfig.get(URL); final JdbcConnectorOptions.Builder builder = JdbcConnectorOptions.builder() + .setClassLoader(classLoader) .setDBUrl(url) .setTableName(readableConfig.get(TABLE_NAME)) - .setDialect(JdbcDialectLoader.load(url)) + .setDialect(JdbcDialectLoader.load(url, classLoader)) .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null)) .setConnectionCheckTimeoutSeconds( (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds()); @@ -260,9 +265,9 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam .collect(Collectors.toSet()); } - private void validateConfigOptions(ReadableConfig config) { + private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) { String jdbcUrl = config.get(URL); - JdbcDialectLoader.load(jdbcUrl); + JdbcDialectLoader.load(jdbcUrl, classLoader); checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD}); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java index 6f84965..cb344b2 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; @@ -96,8 +95,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { this.query = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); - String dbURL = options.getDbURL(); - JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL); + JdbcDialect jdbcDialect = options.getDialect(); this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); this.lookupKeyRowConverter = jdbcDialect.getRowConverter( diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java index c06d0d6..d92c89e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java @@ -129,6 +129,7 @@ public class MySqlCatalogTestBase { CATALOGS.put( dockerImageName, new MySqlCatalog( + Thread.currentThread().getContextClassLoader(), TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java index 4b94ae5..73b56a3 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java @@ -82,6 +82,7 @@ public class PostgresCatalogTestBase { catalog = new PostgresCatalog( + Thread.currentThread().getContextClassLoader(), TEST_CATALOG_NAME, PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java index 2da3112..1527a07 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java @@ -71,6 +71,7 @@ public class JdbcCatalogFactoryTest { catalog = new JdbcCatalog( + Thread.currentThread().getContextClassLoader(), TEST_CATALOG_NAME, PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java index cc76114..b4827b6 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java @@ -35,7 +35,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link OraclePreparedStatementTest}. */ public class OraclePreparedStatementTest { - private final JdbcDialect dialect = JdbcDialectLoader.load("jdbc:oracle://localhost:3306/test"); + private final JdbcDialect dialect = + JdbcDialectLoader.load( + "jdbc:oracle://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; private final String[] keyFields = new String[] {"id", "__field_3__"}; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java index 6013df6..013c711 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java @@ -34,7 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FieldNamedPreparedStatementImpl}. */ public class FieldNamedPreparedStatementImplTest { - private final JdbcDialect dialect = JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test"); + private final JdbcDialect dialect = + JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; private final String[] keyFields = new String[] {"id", "__field_3__"}; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java index 2617832..66b0cc4 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestBase; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; @@ -61,6 +62,10 @@ public class JdbcAppendOnlyWriterTest extends JdbcTestBase { .setOptions( JdbcConnectorOptions.builder() .setDBUrl(getDbMetadata().getUrl()) + .setDialect( + JdbcDialectLoader.load( + getDbMetadata().getUrl(), + getClass().getClassLoader())) .setTableName(OUTPUT_TABLE) .build()) .setFieldNames(fieldNames)