This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 67df120318d7b33b95bc85d69824e6f9f327ed64
Author: fengli <ldliu...@163.com>
AuthorDate: Mon Aug 29 20:18:20 2022 +0800

    [FLINK-29074][Connectors/JDBC] Fix ClassNotFound exception when using jdbc 
connector by add jar syntax
    
    This closes #20707
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 23 ++++++++++----
 .../flink/connector/jdbc/catalog/JdbcCatalog.java  |  5 +--
 .../connector/jdbc/catalog/JdbcCatalogUtils.java   |  9 ++++--
 .../flink/connector/jdbc/catalog/MySqlCatalog.java | 37 ++++++++++++++--------
 .../connector/jdbc/catalog/PostgresCatalog.java    |  3 +-
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   |  1 +
 .../connector/jdbc/dialect/JdbcDialectLoader.java  |  6 ++--
 .../internal/options/JdbcConnectorOptions.java     | 19 ++++++++++-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 29 ++++++++++-------
 .../jdbc/table/JdbcRowDataLookupFunction.java      |  4 +--
 .../jdbc/catalog/MySqlCatalogTestBase.java         |  1 +
 .../jdbc/catalog/PostgresCatalogTestBase.java      |  1 +
 .../catalog/factory/JdbcCatalogFactoryTest.java    |  1 +
 .../oracle/OraclePreparedStatementTest.java        |  4 ++-
 .../FieldNamedPreparedStatementImplTest.java       |  3 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       |  5 +++
 16 files changed, 103 insertions(+), 48 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 8241085..482ce79 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -51,6 +51,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.commons.compress.utils.Lists;
 import org.slf4j.Logger;
@@ -79,18 +80,21 @@ import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAM
 import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Abstract catalog for any JDBC catalogs. */
 public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
 
+    protected final ClassLoader userClassLoader;
     protected final String username;
     protected final String pwd;
     protected final String baseUrl;
     protected final String defaultUrl;
 
     public AbstractJdbcCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
@@ -98,12 +102,14 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog {
             String baseUrl) {
         super(catalogName, defaultDatabase);
 
+        checkNotNull(userClassLoader);
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
 
         JdbcCatalogUtils.validateJdbcUrl(baseUrl);
 
+        this.userClassLoader = userClassLoader;
         this.username = username;
         this.pwd = pwd;
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
@@ -112,14 +118,17 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog {
 
     @Override
     public void open() throws CatalogException {
-        // test connection, fail early if we cannot connect to database
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-        } catch (SQLException e) {
-            throw new ValidationException(
-                    String.format("Failed connecting to %s via JDBC.", 
defaultUrl), e);
+        // load the Driver use userClassLoader explicitly, see FLINK-15635 for 
more detail
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            // test connection, fail early if we cannot connect to database
+            try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            } catch (SQLException e) {
+                throw new ValidationException(
+                        String.format("Failed connecting to %s via JDBC.", 
defaultUrl), e);
+            }
+            LOG.info("Catalog {} established connection to {}", getName(), 
defaultUrl);
         }
-
-        LOG.info("Catalog {} established connection to {}", getName(), 
defaultUrl);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
index 1592ef4..0a93171 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
@@ -41,16 +41,17 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
     private final AbstractJdbcCatalog internal;
 
     public JdbcCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, 
baseUrl);
 
         internal =
                 JdbcCatalogUtils.createCatalog(
-                        catalogName, defaultDatabase, username, pwd, baseUrl);
+                        userClassLoader, catalogName, defaultDatabase, 
username, pwd, baseUrl);
     }
 
     // ------ databases -----
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index fa1af79..28bea80 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -39,17 +39,20 @@ public class JdbcCatalogUtils {
 
     /** Create catalog instance from given information. */
     public static AbstractJdbcCatalog createCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl);
+        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);
 
         if (dialect instanceof PostgresDialect) {
-            return new PostgresCatalog(catalogName, defaultDatabase, username, 
pwd, baseUrl);
+            return new PostgresCatalog(
+                    userClassLoader, catalogName, defaultDatabase, username, 
pwd, baseUrl);
         } else if (dialect instanceof MySqlDialect) {
-            return new MySqlCatalog(catalogName, defaultDatabase, username, 
pwd, baseUrl);
+            return new MySqlCatalog(
+                    userClassLoader, catalogName, defaultDatabase, username, 
pwd, baseUrl);
         } else {
             throw new UnsupportedOperationException(
                     String.format("Catalog for '%s' is not supported yet.", 
dialect));
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
index de3dfe2..d88b228 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -60,12 +61,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             };
 
     public MySqlCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, 
baseUrl);
 
         String driverVersion =
                 Preconditions.checkNotNull(getDriverVersion(), "Driver version 
must not be null.");
@@ -118,23 +120,30 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     }
 
     private String getDatabaseVersion() {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            return conn.getMetaData().getDatabaseProductVersion();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed in getting MySQL version by %s.", 
defaultUrl), e);
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+                return conn.getMetaData().getDatabaseProductVersion();
+            } catch (Exception e) {
+                throw new CatalogException(
+                        String.format("Failed in getting MySQL version by 
%s.", defaultUrl), e);
+            }
         }
     }
 
     private String getDriverVersion() {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            String driverVersion = conn.getMetaData().getDriverVersion();
-            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
-            Matcher matcher = regexp.matcher(driverVersion);
-            return matcher.find() ? matcher.group(0) : null;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed in getting MySQL driver version by 
%s.", defaultUrl), e);
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+                String driverVersion = conn.getMetaData().getDriverVersion();
+                Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+                Matcher matcher = regexp.matcher(driverVersion);
+                return matcher.find() ? matcher.group(0) : null;
+            } catch (Exception e) {
+                throw new CatalogException(
+                        String.format("Failed in getting MySQL driver version 
by %s.", defaultUrl),
+                        e);
+            }
         }
     }
 
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index 5d767e5..de41c01 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -71,12 +71,13 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
     private final JdbcDialectTypeMapper dialectTypeMapper;
 
     protected PostgresCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, 
baseUrl);
         this.dialectTypeMapper = new PostgresTypeMapper();
     }
 
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 2fb74f2..9677744 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -70,6 +70,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
         helper.validate();
 
         return new JdbcCatalog(
+                context.getClassLoader(),
                 context.getName(),
                 helper.getOptions().get(DEFAULT_DATABASE),
                 helper.getOptions().get(USERNAME),
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
index fa9253c..6dfad9b 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
@@ -41,13 +41,13 @@ public final class JdbcDialectLoader {
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
      * @param url A database URL.
+     * @param classLoader the classloader used to load the factory
      * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url) {
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
+    public static JdbcDialect load(String url, ClassLoader classLoader) {
+        List<JdbcDialectFactory> foundFactories = 
discoverFactories(classLoader);
 
         if (foundFactories.isEmpty()) {
             throw new IllegalStateException(
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
index 89d22bd..0b2562a 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
@@ -105,6 +105,7 @@ public class JdbcConnectorOptions extends 
JdbcConnectionOptions {
 
     /** Builder of {@link JdbcConnectorOptions}. */
     public static class Builder {
+        private ClassLoader classLoader;
         private String dbURL;
         private String tableName;
         private String driverName;
@@ -114,6 +115,19 @@ public class JdbcConnectorOptions extends 
JdbcConnectionOptions {
         private Integer parallelism;
         private int connectionCheckTimeoutSeconds = 60;
 
+        /**
+         * optional, specifies the classloader to use in the planner for load 
the class in user jar.
+         *
+         * <p>By default, this is configured using {@code
+         * Thread.currentThread().getContextClassLoader()}.
+         *
+         * <p>Modify the {@link ClassLoader} only if you know what you're 
doing.
+         */
+        public Builder setClassLoader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
         /** required, table name. */
         public Builder setTableName(String tableName) {
             this.tableName = tableName;
@@ -171,7 +185,10 @@ public class JdbcConnectorOptions extends 
JdbcConnectionOptions {
             checkNotNull(dbURL, "No dbURL supplied.");
             checkNotNull(tableName, "No tableName supplied.");
             if (this.dialect == null) {
-                this.dialect = JdbcDialectLoader.load(dbURL);
+                if (classLoader == null) {
+                    classLoader = 
Thread.currentThread().getContextClassLoader();
+                }
+                this.dialect = JdbcDialectLoader.load(dbURL, classLoader);
             }
             if (this.driverName == null) {
                 Optional<String> optional = dialect.defaultDriverName();
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 5760686..bff5a49 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -87,9 +87,10 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         final ReadableConfig config = helper.getOptions();
 
         helper.validate();
-        validateConfigOptions(config);
-        validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), 
config.get(URL));
-        JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
+        JdbcConnectorOptions jdbcOptions = getJdbcOptions(config, 
context.getClassLoader());
 
         return new JdbcDynamicTableSink(
                 jdbcOptions,
@@ -108,28 +109,32 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         final ReadableConfig config = helper.getOptions();
 
         helper.validate();
-        validateConfigOptions(config);
-        validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), 
config.get(URL));
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
         return new JdbcDynamicTableSource(
-                getJdbcOptions(helper.getOptions()),
+                getJdbcOptions(helper.getOptions(), context.getClassLoader()),
                 getJdbcReadOptions(helper.getOptions()),
                 helper.getOptions().get(LookupOptions.MAX_RETRIES),
                 getLookupCache(config),
                 context.getPhysicalRowDataType());
     }
 
-    private static void validateDataTypeWithJdbcDialect(DataType dataType, 
String url) {
-        final JdbcDialect dialect = JdbcDialectLoader.load(url);
+    private static void validateDataTypeWithJdbcDialect(
+            DataType dataType, String url, ClassLoader classLoader) {
+        final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
         dialect.validate((RowType) dataType.getLogicalType());
     }
 
-    private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) 
{
+    private JdbcConnectorOptions getJdbcOptions(
+            ReadableConfig readableConfig, ClassLoader classLoader) {
         final String url = readableConfig.get(URL);
         final JdbcConnectorOptions.Builder builder =
                 JdbcConnectorOptions.builder()
+                        .setClassLoader(classLoader)
                         .setDBUrl(url)
                         .setTableName(readableConfig.get(TABLE_NAME))
-                        .setDialect(JdbcDialectLoader.load(url))
+                        .setDialect(JdbcDialectLoader.load(url, classLoader))
                         
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
                         .setConnectionCheckTimeoutSeconds(
                                 (int) 
readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
@@ -260,9 +265,9 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 .collect(Collectors.toSet());
     }
 
-    private void validateConfigOptions(ReadableConfig config) {
+    private void validateConfigOptions(ReadableConfig config, ClassLoader 
classLoader) {
         String jdbcUrl = config.get(URL);
-        JdbcDialectLoader.load(jdbcUrl);
+        JdbcDialectLoader.load(jdbcUrl, classLoader);
 
         checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
 
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
index 6f84965..cb344b2 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
@@ -96,8 +95,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
         this.query =
                 options.getDialect()
                         .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
-        String dbURL = options.getDbURL();
-        JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL);
+        JdbcDialect jdbcDialect = options.getDialect();
         this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
         this.lookupKeyRowConverter =
                 jdbcDialect.getRowConverter(
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index c06d0d6..d92c89e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -129,6 +129,7 @@ public class MySqlCatalogTestBase {
             CATALOGS.put(
                     dockerImageName,
                     new MySqlCatalog(
+                            Thread.currentThread().getContextClassLoader(),
                             TEST_CATALOG_NAME,
                             TEST_DB,
                             TEST_USERNAME,
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 4b94ae5..73b56a3 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -82,6 +82,7 @@ public class PostgresCatalogTestBase {
 
         catalog =
                 new PostgresCatalog(
+                        Thread.currentThread().getContextClassLoader(),
                         TEST_CATALOG_NAME,
                         PostgresCatalog.DEFAULT_DATABASE,
                         TEST_USERNAME,
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index 2da3112..1527a07 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -71,6 +71,7 @@ public class JdbcCatalogFactoryTest {
 
         catalog =
                 new JdbcCatalog(
+                        Thread.currentThread().getContextClassLoader(),
                         TEST_CATALOG_NAME,
                         PostgresCatalog.DEFAULT_DATABASE,
                         TEST_USERNAME,
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
index cc76114..b4827b6 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
@@ -35,7 +35,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link OraclePreparedStatementTest}. */
 public class OraclePreparedStatementTest {
 
-    private final JdbcDialect dialect = 
JdbcDialectLoader.load("jdbc:oracle://localhost:3306/test");
+    private final JdbcDialect dialect =
+            JdbcDialectLoader.load(
+                    "jdbc:oracle://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
index 6013df6..013c711 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
@@ -34,7 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link FieldNamedPreparedStatementImpl}. */
 public class FieldNamedPreparedStatementImplTest {
 
-    private final JdbcDialect dialect = 
JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test");
+    private final JdbcDialect dialect =
+            JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 2617832..66b0cc4 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 
@@ -61,6 +62,10 @@ public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
                         .setOptions(
                                 JdbcConnectorOptions.builder()
                                         .setDBUrl(getDbMetadata().getUrl())
+                                        .setDialect(
+                                                JdbcDialectLoader.load(
+                                                        
getDbMetadata().getUrl(),
+                                                        
getClass().getClassLoader()))
                                         .setTableName(OUTPUT_TABLE)
                                         .build())
                         .setFieldNames(fieldNames)

Reply via email to