This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit d2564a94f2da0a2dd803d87d89822f3bf251c54a
Author: Joao Boto <b...@boto.pro>
AuthorDate: Mon Jan 30 16:52:28 2023 +0100

    [FLINK-30790] Change H2 and some Derby tests to new implementation
---
 .../apache/flink/connector/jdbc/DbMetadata.java    | 51 --------------------
 .../flink/connector/jdbc/JdbcDataTestBase.java     |  5 +-
 .../apache/flink/connector/jdbc/JdbcITCase.java    | 15 +++---
 .../flink/connector/jdbc/JdbcInputFormatTest.java  | 10 +---
 .../connector/jdbc/JdbcRowOutputFormatTest.java    |  3 +-
 .../apache/flink/connector/jdbc/JdbcTestBase.java  | 12 ++---
 .../flink/connector/jdbc/JdbcTestFixture.java      | 30 ++++--------
 .../connector/jdbc/databases/DatabaseMetadata.java | 39 +++++++++++++--
 .../jdbc/databases/derby/DerbyDatabase.java        |  3 +-
 .../jdbc/databases/derby/DerbyMetadata.java        |  1 -
 .../connector/jdbc/databases/h2/H2Metadata.java    |  3 +-
 .../h2/xa}/H2XaConnectionWrapper.java              |  2 +-
 .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java  |  2 +-
 .../h2/xa}/H2XaResourceWrapper.java                |  2 +-
 .../{xa/h2 => databases/h2/xa}/package-info.java   |  2 +-
 .../jdbc/databases/mysql/MySqlMetadata.java        |  2 -
 .../jdbc/databases/oracle/OracleMetadata.java      |  1 -
 .../jdbc/databases/postgres/PostgresMetadata.java  |  1 -
 .../databases/sqlserver/SqlServerMetadata.java     |  1 -
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java |  4 +-
 .../oracle/OracleExactlyOnceSinkE2eTest.java       |  4 +-
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  4 +-
 .../connector/jdbc/internal/JdbcFullTest.java      | 15 +++---
 .../jdbc/internal/JdbcTableOutputFormatTest.java   | 12 ++---
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       | 16 +++----
 .../jdbc/table/JdbcDynamicTableSinkITCase.java     | 15 ++----
 .../connector/jdbc/table/JdbcOutputFormatTest.java |  3 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        |  6 +--
 .../connector/jdbc/xa/JdbcXaFacadeImplTest.java    |  8 ++--
 .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java     |  8 ++--
 .../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java  | 12 ++---
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 10 ++--
 .../jdbc/xa/JdbcXaSinkNoInsertionTest.java         |  9 +---
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java      | 16 +++----
 .../flink/connector/jdbc/xa/h2/H2DbMetadata.java   | 56 ----------------------
 35 files changed, 130 insertions(+), 253 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
deleted file mode 100644
index 55c5317..0000000
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc;
-
-import javax.sql.XADataSource;
-
-import java.io.Serializable;
-
-/** Describes a database: driver, schema and urls. */
-public interface DbMetadata extends Serializable {
-
-    default String getInitUrl() {
-        return getUrl();
-    }
-
-    String getUrl();
-
-    default String getUser() {
-        return "";
-    }
-
-    default String getPassword() {
-        return "";
-    }
-
-    XADataSource buildXaDataSource();
-
-    String getDriverClass();
-
-    default JdbcConnectionOptions toConnectionOptions() {
-        return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                .withDriverName(getDriverClass())
-                .withUrl(getUrl())
-                .build();
-    }
-}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index 003cf57..8b8a5aa 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -40,11 +41,11 @@ import static org.mockito.Mockito.doReturn;
 public abstract class JdbcDataTestBase extends JdbcTestBase {
     @BeforeEach
     void initData() throws SQLException {
-        JdbcTestFixture.initData(getDbMetadata());
+        JdbcTestFixture.initData(getMetadata());
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return DERBY_EBOOKSHOP_DB;
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index d1a830b..41a8082 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -73,8 +74,8 @@ public class JdbcITCase extends JdbcTestBase {
                                 String.format(INSERT_TEMPLATE, INPUT_TABLE),
                                 TEST_ENTRY_JDBC_STATEMENT_BUILDER,
                                 new JdbcConnectionOptionsBuilder()
-                                        .withUrl(getDbMetadata().getUrl())
-                                        
.withDriverName(getDbMetadata().getDriverClass())
+                                        .withUrl(getMetadata().getUrl())
+                                        
.withDriverName(getMetadata().getDriverClass())
                                         .build()));
         env.execute();
 
@@ -107,8 +108,8 @@ public class JdbcITCase extends JdbcTestBase {
                                     ps.setString(2, e.content);
                                 },
                                 new JdbcConnectionOptionsBuilder()
-                                        .withUrl(getDbMetadata().getUrl())
-                                        
.withDriverName(getDbMetadata().getDriverClass())
+                                        .withUrl(getMetadata().getUrl())
+                                        
.withDriverName(getMetadata().getDriverClass())
                                         .build()));
         env.execute();
 
@@ -117,7 +118,7 @@ public class JdbcITCase extends JdbcTestBase {
 
     private List<String> selectWords() throws SQLException {
         ArrayList<String> strings = new ArrayList<>();
-        try (Connection connection = 
DriverManager.getConnection(getDbMetadata().getUrl())) {
+        try (Connection connection = 
DriverManager.getConnection(getMetadata().getUrl())) {
             try (Statement st = connection.createStatement()) {
                 try (ResultSet rs = st.executeQuery("select word from words")) 
{
                     while (rs.next()) {
@@ -144,7 +145,7 @@ public class JdbcITCase extends JdbcTestBase {
 
     private List<TestEntry> selectBooks() throws SQLException {
         List<TestEntry> result = new ArrayList<>();
-        try (Connection connection = 
DriverManager.getConnection(getDbMetadata().getUrl())) {
+        try (Connection connection = 
DriverManager.getConnection(getMetadata().getUrl())) {
             
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
             connection.setReadOnly(true);
             try (Statement st = connection.createStatement()) {
@@ -167,7 +168,7 @@ public class JdbcITCase extends JdbcTestBase {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return DERBY_EBOOKSHOP_DB;
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index 92c0e79..ca6baa2 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
@@ -192,11 +191,8 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
                         .finish();
         jdbcInputFormat.openInputFormat();
 
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
         final int defaultFetchSize =
-                DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl())
-                        .createStatement()
-                        .getFetchSize();
+                
DERBY_EBOOKSHOP_DB.getConnection().createStatement().getFetchSize();
 
         
assertThat(jdbcInputFormat.getStatement().getFetchSize()).isEqualTo(defaultFetchSize);
     }
@@ -229,9 +225,7 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
                         .finish();
         jdbcInputFormat.openInputFormat();
 
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        final boolean defaultAutoCommit =
-                
DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl()).getAutoCommit();
+        final boolean defaultAutoCommit = 
DERBY_EBOOKSHOP_DB.getConnection().getAutoCommit();
 
         
assertThat(jdbcInputFormat.getDbConn().getAutoCommit()).isEqualTo(defaultAutoCommit);
     }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
index 3150a15..f0427c0 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
@@ -62,8 +62,7 @@ class JdbcRowOutputFormatTest extends JdbcDataTestBase {
         }
         jdbcOutputFormat = null;
 
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+        try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DELETE FROM " + OUTPUT_TABLE);
         }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
index 1214a08..ee6b208 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.connector.jdbc;
 
+import org.apache.flink.connector.jdbc.databases.DatabaseTest;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
@@ -24,18 +26,16 @@ import org.junit.jupiter.api.BeforeEach;
  * Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses 
create tables before
  * each test and drops afterwards.
  */
-public abstract class JdbcTestBase {
+public abstract class JdbcTestBase implements DatabaseTest {
 
     @BeforeEach
     public void before() throws Exception {
-        JdbcTestFixture.initSchema(getDbMetadata());
+        JdbcTestFixture.initSchema(getMetadata());
     }
 
     @AfterEach
     public void after() throws Exception {
-        JdbcTestFixture.cleanupData(getDbMetadata().getUrl());
-        JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata());
+        JdbcTestFixture.cleanupData(getMetadata().getUrl());
+        JdbcTestFixture.cleanUpDatabasesStatic(getMetadata());
     }
-
-    protected abstract DbMetadata getDbMetadata();
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
index d00fbad..cab8135 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata;
-import org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.databases.derby.DerbyDatabase;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.OutputStream;
@@ -36,7 +36,7 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 
 /** Test data and helper objects for JDBC tests. */
 @SuppressWarnings("SpellCheckingInspection")
-public class JdbcTestFixture {
+public class JdbcTestFixture implements DerbyDatabase {
     public static final JdbcTestCheckpoint CP0 = new JdbcTestCheckpoint(0, 1, 
2, 3);
     public static final JdbcTestCheckpoint CP1 = new JdbcTestCheckpoint(1, 4, 
5, 6);
 
@@ -74,10 +74,7 @@ public class JdbcTestFixture {
         new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 
1010)
     };
 
-    private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop";
-    public static final DerbyMetadata DERBY_EBOOKSHOP_DB =
-            new DerbyMetadata(EBOOKSHOP_SCHEMA_NAME);
-    public static final H2DbMetadata H2_EBOOKSHOP_DB = new 
H2DbMetadata(EBOOKSHOP_SCHEMA_NAME);
+    public static final DatabaseMetadata DERBY_EBOOKSHOP_DB = 
DerbyDatabase.startDatabase();
 
     /** TestEntry. */
     public static class TestEntry implements Serializable {
@@ -188,14 +185,8 @@ public class JdbcTestFixture {
                 public void write(int b) {}
             };
 
-    public static void initSchema(DbMetadata dbMetadata)
-            throws ClassNotFoundException, SQLException {
-        System.setProperty(
-                "derby.stream.error.field", 
JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
-        Class.forName(dbMetadata.getDriverClass());
-        try (Connection conn =
-                DriverManager.getConnection(
-                        dbMetadata.getInitUrl(), dbMetadata.getUser(), 
dbMetadata.getPassword())) {
+    public static void initSchema(DatabaseMetadata metadata) throws 
SQLException {
+        try (Connection conn = metadata.getConnection()) {
             createTable(conn, JdbcTestFixture.INPUT_TABLE);
             createTable(conn, OUTPUT_TABLE);
             createTable(conn, OUTPUT_TABLE_2);
@@ -218,7 +209,7 @@ public class JdbcTestFixture {
         }
     }
 
-    static void initData(DbMetadata dbMetadata) throws SQLException {
+    static void initData(DatabaseMetadata dbMetadata) throws SQLException {
         try (Connection conn = 
DriverManager.getConnection(dbMetadata.getUrl())) {
             insertDataIntoInputTable(conn);
         }
@@ -234,10 +225,9 @@ public class JdbcTestFixture {
         stat.close();
     }
 
-    public static void cleanUpDatabasesStatic(DbMetadata dbMetadata)
-            throws ClassNotFoundException, SQLException {
-        Class.forName(dbMetadata.getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(dbMetadata.getUrl());
+    public static void cleanUpDatabasesStatic(DatabaseMetadata dbMetadata) 
throws SQLException {
+
+        try (Connection conn = dbMetadata.getConnection();
                 Statement stat = conn.createStatement()) {
 
             stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
index fcfc656..2d3fbec 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
@@ -1,15 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.connector.jdbc.databases;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.sql.XADataSource;
 
 import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
 
-public interface DatabaseMetadata extends Serializable, DbMetadata {
+/** Describes a database: driver, schema and urls. */
+public interface DatabaseMetadata extends Serializable {
 
-    default String getUrl(){
+    default String getUrl() {
         return getJdbcUrl();
     }
 
@@ -29,7 +49,7 @@ public interface DatabaseMetadata extends Serializable, 
DbMetadata {
 
     String getVersion();
 
-    default JdbcConnectionOptions toConnectionOptions() {
+    default JdbcConnectionOptions getConnectionOptions() {
         return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withDriverName(getDriverClass())
                 .withUrl(getJdbcUrl())
@@ -37,4 +57,13 @@ public interface DatabaseMetadata extends Serializable, 
DbMetadata {
                 .withPassword(getPassword())
                 .build();
     }
-}
\ No newline at end of file
+
+    default Connection getConnection() {
+        try {
+            Class.forName(getDriverClass());
+            return DriverManager.getConnection(getUrl(), getUser(), 
getPassword());
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java
index f0f6e3a..d2bfae5 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java
@@ -32,7 +32,8 @@ public interface DerbyDatabase extends DatabaseTest {
                     "derby.stream.error.field",
                     DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
             Class.forName(metadata.getDriverClass());
-            DriverManager.getConnection(String.format("%s;create=true", 
metadata.getJdbcUrl())).close();
+            DriverManager.getConnection(String.format("%s;create=true", 
metadata.getJdbcUrl()))
+                    .close();
         } catch (Exception e) {
             throw new FlinkRuntimeException(e);
         }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
index f7cc8fb..33907db 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
@@ -66,5 +66,4 @@ public class DerbyMetadata implements DatabaseMetadata {
     public String getVersion() {
         return "derby:memory";
     }
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
index 027fb53..d94e4d6 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
@@ -18,7 +18,7 @@
 package org.apache.flink.connector.jdbc.databases.h2;
 
 import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
-import org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper;
+import org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper;
 
 import javax.sql.XADataSource;
 
@@ -62,5 +62,4 @@ public class H2Metadata implements DatabaseMetadata {
     public String getVersion() {
         return "h2:mem";
     }
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java
similarity index 97%
rename from 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java
rename to 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java
index fd08eb6..50893db 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.xa.h2;
+package org.apache.flink.connector.jdbc.databases.h2.xa;
 
 import javax.sql.ConnectionEventListener;
 import javax.sql.StatementEventListener;
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java
similarity index 97%
rename from 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java
rename to 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java
index 9d348d9..25b6c37 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.xa.h2;
+package org.apache.flink.connector.jdbc.databases.h2.xa;
 
 import javax.sql.XAConnection;
 import javax.sql.XADataSource;
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java
similarity index 99%
rename from 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
rename to 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java
index 0a8572f..8cc7074 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.xa.h2;
+package org.apache.flink.connector.jdbc.databases.h2.xa;
 
 import org.apache.flink.util.function.ThrowingRunnable;
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java
similarity index 94%
rename from 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java
rename to 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java
index fa20661..4e4ad39 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java
@@ -19,4 +19,4 @@
  * This package holds some workarounds for the H2 XA client, plus {@link
  * org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata}. Used only for testing.
  */
-package org.apache.flink.connector.jdbc.xa.h2;
+package org.apache.flink.connector.jdbc.databases.h2.xa;
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
index 4bb3d5a..9125232 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
@@ -84,6 +84,4 @@ public class MySqlMetadata implements DatabaseMetadata {
     public String getVersion() {
         return this.version;
     }
-
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java
index be2c70f..8708a5e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java
@@ -89,5 +89,4 @@ public class OracleMetadata implements DatabaseMetadata {
     public String getVersion() {
         return version;
     }
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java
index e3d6c04..5d62b06 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java
@@ -84,5 +84,4 @@ public class PostgresMetadata implements DatabaseMetadata {
     public String getVersion() {
         return version;
     }
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java
index 5435746..0fa59f4 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java
@@ -84,5 +84,4 @@ public class SqlServerMetadata implements DatabaseMetadata {
     public String getVersion() {
         return version;
     }
-
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
index d03c928..73751a2 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
@@ -1,6 +1,6 @@
 package org.apache.flink.connector.jdbc.dialect.mysql;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata;
 import org.apache.flink.connector.jdbc.test.DockerImageVersions;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
@@ -44,7 +44,7 @@ public class MySqlExactlyOnceSinkE2eTest extends 
JdbcExactlyOnceSinkE2eTest {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return new MySqlMetadata(CONTAINER);
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java
index b3dce62..66e5b2e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java
@@ -1,6 +1,6 @@
 package org.apache.flink.connector.jdbc.dialect.oracle;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.databases.oracle.OracleMetadata;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
 import org.apache.flink.util.function.SerializableSupplier;
@@ -26,7 +26,7 @@ public class OracleExactlyOnceSinkE2eTest extends 
JdbcExactlyOnceSinkE2eTest {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return new OracleMetadata(CONTAINER);
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java
index 5001f14..807a2a4 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java
@@ -1,6 +1,6 @@
 package org.apache.flink.connector.jdbc.dialect.postgres;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.databases.postgres.PostgresMetadata;
 import org.apache.flink.connector.jdbc.test.DockerImageVersions;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
@@ -35,7 +35,7 @@ public class PostgresExactlyOnceSinkE2eTest extends 
JdbcExactlyOnceSinkE2eTest {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return new PostgresMetadata(CONTAINER);
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index e92a9ff..4966c28 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -80,7 +80,7 @@ class JdbcFullTest extends JdbcDataTestBase {
                     JdbcOutputFormat.builder()
                             .setOptions(
                                     JdbcConnectorOptions.builder()
-                                            .setDBUrl(getDbMetadata().getUrl())
+                                            .setDBUrl(getMetadata().getUrl())
                                             .setTableName(OUTPUT_TABLE)
                                             .build())
                             .setFieldNames(new String[] {"id", "title", 
"author", "price", "qty"})
@@ -114,8 +114,8 @@ class JdbcFullTest extends JdbcDataTestBase {
         ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
         JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
                 JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(getDbMetadata().getDriverClass())
-                        .setDBUrl(getDbMetadata().getUrl())
+                        .setDrivername(getMetadata().getDriverClass())
+                        .setDBUrl(getMetadata().getUrl())
                         .setQuery(SELECT_ALL_BOOKS)
                         .setRowTypeInfo(ROW_TYPE_INFO);
 
@@ -138,8 +138,8 @@ class JdbcFullTest extends JdbcDataTestBase {
         // in PreparedStatement.setObject (see its javadoc for more details)
         JdbcConnectionOptions connectionOptions =
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                        .withUrl(getDbMetadata().getUrl())
-                        .withDriverName(getDbMetadata().getDriverClass())
+                        .withUrl(getMetadata().getUrl())
+                        .withDriverName(getMetadata().getDriverClass())
                         .build();
 
         JdbcOutputFormat jdbcOutputFormat =
@@ -162,7 +162,7 @@ class JdbcFullTest extends JdbcDataTestBase {
         source.output(jdbcOutputFormat);
         environment.execute();
 
-        try (Connection dbConn = 
DriverManager.getConnection(getDbMetadata().getUrl());
+        try (Connection dbConn = 
DriverManager.getConnection(getMetadata().getUrl());
                 PreparedStatement statement = 
dbConn.prepareStatement(SELECT_ALL_NEWBOOKS);
                 ResultSet resultSet = statement.executeQuery()) {
             int count = 0;
@@ -175,8 +175,7 @@ class JdbcFullTest extends JdbcDataTestBase {
 
     @AfterEach
     void clearOutputTable() throws Exception {
-        Class.forName(getDbMetadata().getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(getDbMetadata().getUrl());
+        try (Connection conn = getMetadata().getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DELETE FROM " + OUTPUT_TABLE);
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 938dcd1..4c0cb19 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -67,7 +67,7 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
     void testUpsertFormatCloseBeforeOpen() throws Exception {
         JdbcConnectorOptions options =
                 JdbcConnectorOptions.builder()
-                        .setDBUrl(getDbMetadata().getUrl())
+                        .setDBUrl(getMetadata().getUrl())
                         .setTableName(OUTPUT_TABLE)
                         .build();
         JdbcDmlOptions dmlOptions =
@@ -102,7 +102,7 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
                 new TableJdbcUpsertOutputFormat(
                         new SimpleJdbcConnectionProvider(
                                 JdbcConnectorOptions.builder()
-                                        .setDBUrl(getDbMetadata().getUrl())
+                                        .setDBUrl(getMetadata().getUrl())
                                         .setTableName(OUTPUT_TABLE)
                                         .build()) {
                             @Override
@@ -174,7 +174,7 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
     void testJdbcOutputFormat() throws Exception {
         JdbcConnectorOptions options =
                 JdbcConnectorOptions.builder()
-                        .setDBUrl(getDbMetadata().getUrl())
+                        .setDBUrl(getMetadata().getUrl())
                         .setTableName(OUTPUT_TABLE)
                         .build();
         JdbcDmlOptions dmlOptions =
@@ -222,7 +222,7 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
     }
 
     private void check(Row[] rows) throws SQLException {
-        check(rows, getDbMetadata().getUrl(), OUTPUT_TABLE, fieldNames);
+        check(rows, getMetadata().getUrl(), OUTPUT_TABLE, fieldNames);
     }
 
     public static void check(Row[] rows, String url, String table, String[] 
fields)
@@ -252,8 +252,8 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
             format.close();
         }
         format = null;
-        Class.forName(getDbMetadata().getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(getDbMetadata().getUrl());
+
+        try (Connection conn = getMetadata().getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DELETE FROM " + OUTPUT_TABLE);
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 34ad9e1..b83d08c 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.connector.jdbc.table;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
@@ -34,7 +34,6 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.Statement;
 
 import static org.apache.flink.connector.jdbc.JdbcDataTestBase.toRow;
@@ -64,11 +63,10 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase {
                                     JdbcOutputFormat.builder()
                                             .setOptions(
                                                     
JdbcConnectorOptions.builder()
-                                                            
.setDBUrl(getDbMetadata().getUrl())
+                                                            
.setDBUrl(getMetadata().getUrl())
                                                             .setDialect(
                                                                     
JdbcDialectLoader.load(
-                                                                            
getDbMetadata()
-                                                                               
     .getUrl(),
+                                                                            
getMetadata().getUrl(),
                                                                             
getClass()
                                                                                
     .getClassLoader()))
                                                             
.setTableName(OUTPUT_TABLE)
@@ -96,8 +94,7 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase {
     }
 
     private void alterTable() throws Exception {
-        Class.forName(getDbMetadata().getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(getDbMetadata().getUrl());
+        try (Connection conn = getMetadata().getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("ALTER  TABLE " + OUTPUT_TABLE + " DROP COLUMN " + 
fieldNames[1]);
         }
@@ -113,15 +110,14 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase {
             }
         }
         format = null;
-        Class.forName(getDbMetadata().getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(getDbMetadata().getUrl());
+        try (Connection conn = getMetadata().getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DELETE FROM " + OUTPUT_TABLE);
         }
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return DERBY_EBOOKSHOP_DB;
     }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index 82a188d..1b6354d 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -52,7 +51,6 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
@@ -70,7 +68,7 @@ import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSin
 /** The ITCase for {@link JdbcDynamicTableSink}. */
 class JdbcDynamicTableSinkITCase extends AbstractTestBase {
 
-    public static final String DB_URL = "jdbc:derby:memory:upsert";
+    public static final String DB_URL = DERBY_EBOOKSHOP_DB.getUrl();
     public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
     public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
     public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch";
@@ -79,12 +77,8 @@ class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     public static final String USER_TABLE = "USER_TABLE";
 
     @BeforeAll
-    static void beforeAll() throws ClassNotFoundException, SQLException {
-        System.setProperty(
-                "derby.stream.error.field", 
JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
-
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        try (Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
+    static void beforeAll() throws SQLException {
+        try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection();
                 Statement stat = conn.createStatement()) {
             stat.executeUpdate(
                     "CREATE TABLE "
@@ -132,8 +126,7 @@ class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     @AfterAll
     static void afterAll() throws Exception {
         TestValuesTableFactory.clearAllData();
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        try (Connection conn = DriverManager.getConnection(DB_URL);
+        try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DROP TABLE " + OUTPUT_TABLE1);
             stat.execute("DROP TABLE " + OUTPUT_TABLE2);
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
index 487d69c..35ff2e8 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
@@ -508,8 +508,7 @@ class JdbcOutputFormatTest extends JdbcDataTestBase {
 
     @AfterEach
     void clearOutputTable() throws Exception {
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+        try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute("DELETE FROM " + OUTPUT_TABLE);
         }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index 13e7875..35e4ce8 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -156,9 +156,9 @@ public abstract class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
 
         List<Integer> insertedIds =
                 getInsertedIds(
-                        getDbMetadata().getUrl(),
-                        getDbMetadata().getUser(),
-                        getDbMetadata().getPassword(),
+                        getMetadata().getUrl(),
+                        getMetadata().getUser(),
+                        getMetadata().getPassword(),
                         INPUT_TABLE);
         List<Integer> expectedIds =
                 IntStream.range(0, elementsPerSource * PARALLELISM)
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
index 3ba4aba..970ed24 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 
 import org.junit.jupiter.api.Test;
 
@@ -59,7 +59,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase {
 
     @Test
     void testRecover() throws Exception {
-        try (XaFacade f = 
XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) {
+        try (XaFacade f = 
XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) {
             f.open();
             assertThat(f.recover()).isEmpty();
             f.start(XID);
@@ -71,7 +71,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase {
             }
             f.endAndPrepare(XID);
         }
-        try (XaFacade f = 
XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) {
+        try (XaFacade f = 
XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) {
             f.open();
             Collection<Xid> recovered = f.recover();
             recovered.forEach(f::rollback);
@@ -99,7 +99,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return JdbcTestFixture.DERBY_EBOOKSHOP_DB;
     }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
index c642b1f..3d3e9c5 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 
 import org.apache.derby.jdbc.EmbeddedXADataSource;
 import org.junit.jupiter.api.Test;
@@ -163,13 +163,11 @@ class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     static EmbeddedXADataSource derbyXaDs() {
-        EmbeddedXADataSource ds = new EmbeddedXADataSource();
-        ds.setDatabaseName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDbName());
-        return ds;
+        return (EmbeddedXADataSource) 
JdbcTestFixture.DERBY_EBOOKSHOP_DB.buildXaDataSource();
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return JdbcTestFixture.DERBY_EBOOKSHOP_DB;
     }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
index b9abcba..38a904a 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.databases.h2.H2XaDatabase;
+import org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper;
 
 import org.junit.jupiter.api.Test;
 
@@ -28,9 +29,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * {@link JdbcXaSinkFunction} tests using H2 DB. H2 uses MVCC (so we can e.g. 
count records while
  * transaction is not yet committed). But XA support isn't full, so for some 
scenarios {@link
- * org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper wrapper} is used, and 
for some - Derby.
+ * H2XaDsWrapper wrapper} is used, and for some - Derby.
  */
-class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
+class JdbcXaSinkH2Test extends JdbcXaSinkTestBase implements H2XaDatabase {
 
     @Test
     void testIgnoreDuplicatedNotification() throws Exception {
@@ -69,9 +70,4 @@ class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
         sinkHelper.snapshotState(2);
         assertThat(xaHelper.countInDb()).isEqualTo(0);
     }
-
-    @Override
-    protected DbMetadata getDbMetadata() {
-        return JdbcTestFixture.H2_EBOOKSHOP_DB;
-    }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 04e3576..dbbf1fa 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.connector.jdbc.xa;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -91,10 +91,10 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
         try (JdbcXaFacadeTestHelper h =
                 new JdbcXaFacadeTestHelper(
                         JdbcXaSinkDerbyTest.derbyXaDs(),
-                        getDbMetadata().getUrl(),
+                        getMetadata().getUrl(),
                         JdbcTestFixture.INPUT_TABLE,
-                        getDbMetadata().getUser(),
-                        getDbMetadata().getPassword())) {
+                        getMetadata().getUser(),
+                        getMetadata().getPassword())) {
             h.assertDbContentsEquals(CP0);
         }
     }
@@ -105,7 +105,7 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
     }
 
     @Override
-    protected DbMetadata getDbMetadata() {
+    public DatabaseMetadata getMetadata() {
         return JdbcTestFixture.DERBY_EBOOKSHOP_DB;
     }
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
index f1e4751..a5f1f6b 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.databases.h2.H2XaDatabase;
 
 import org.junit.jupiter.api.Test;
 
@@ -26,7 +26,7 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests that data is not inserted ahead of time. */
-class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
+class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase implements 
H2XaDatabase {
 
     @Test
     void testNoInsertAfterInvoke() throws Exception {
@@ -61,9 +61,4 @@ class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
         }
         assertThat(xaHelper.countInDb()).isEqualTo(0);
     }
-
-    @Override
-    protected DbMetadata getDbMetadata() {
-        return JdbcTestFixture.H2_EBOOKSHOP_DB;
-    }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index 3a688aa..fb0968d 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -83,14 +83,14 @@ abstract class JdbcXaSinkTestBase extends JdbcTestBase {
 
     @BeforeEach
     void initHelpers() throws Exception {
-        xaDataSource = getDbMetadata().buildXaDataSource();
+        xaDataSource = getMetadata().buildXaDataSource();
         xaHelper =
                 new JdbcXaFacadeTestHelper(
-                        getDbMetadata().buildXaDataSource(),
-                        getDbMetadata().getUrl(),
+                        getMetadata().buildXaDataSource(),
+                        getMetadata().getUrl(),
                         INPUT_TABLE,
-                        getDbMetadata().getUser(),
-                        getDbMetadata().getPassword());
+                        getMetadata().getUser(),
+                        getMetadata().getPassword());
         sinkHelper = buildSinkHelper(createStateHandler());
     }
 
@@ -109,10 +109,10 @@ abstract class JdbcXaSinkTestBase extends JdbcTestBase {
         try (JdbcXaFacadeTestHelper xa =
                 new JdbcXaFacadeTestHelper(
                         xaDataSource,
-                        getDbMetadata().getUrl(),
+                        getMetadata().getUrl(),
                         INPUT_TABLE,
-                        getDbMetadata().getUser(),
-                        getDbMetadata().getPassword())) {
+                        getMetadata().getUser(),
+                        getMetadata().getPassword())) {
             xa.cancelAllTx();
         }
     }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java
deleted file mode 100644
index 36c7fc8..0000000
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc.xa.h2;
-
-import org.apache.flink.connector.jdbc.DbMetadata;
-
-import javax.sql.XADataSource;
-
-/** H2DbMetadata. */
-public class H2DbMetadata implements DbMetadata {
-
-    private final String schema;
-
-    public H2DbMetadata(String schema) {
-        this.schema = schema;
-    }
-
-    @Override
-    public XADataSource buildXaDataSource() {
-        final org.h2.jdbcx.JdbcDataSource ds = new 
org.h2.jdbcx.JdbcDataSource();
-        ds.setUrl(getUrl());
-        return new H2XaDsWrapper(ds);
-    }
-
-    @Override
-    public String getDriverClass() {
-        return "org.h2.Driver";
-    }
-
-    @Override
-    public String getUrl() {
-        return String.format("jdbc:h2:mem:%s;INIT=SET SCHEMA %s", schema, 
schema);
-    }
-
-    @Override
-    public String getInitUrl() {
-        return String.format(
-                "jdbc:h2:mem:%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT 
EXISTS %s\\;SET SCHEMA %s",
-                schema, schema, schema);
-    }
-}

Reply via email to