This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit d2564a94f2da0a2dd803d87d89822f3bf251c54a Author: Joao Boto <b...@boto.pro> AuthorDate: Mon Jan 30 16:52:28 2023 +0100 [FLINK-30790] Change H2 and some Derby tests to new implementation --- .../apache/flink/connector/jdbc/DbMetadata.java | 51 -------------------- .../flink/connector/jdbc/JdbcDataTestBase.java | 5 +- .../apache/flink/connector/jdbc/JdbcITCase.java | 15 +++--- .../flink/connector/jdbc/JdbcInputFormatTest.java | 10 +--- .../connector/jdbc/JdbcRowOutputFormatTest.java | 3 +- .../apache/flink/connector/jdbc/JdbcTestBase.java | 12 ++--- .../flink/connector/jdbc/JdbcTestFixture.java | 30 ++++-------- .../connector/jdbc/databases/DatabaseMetadata.java | 39 +++++++++++++-- .../jdbc/databases/derby/DerbyDatabase.java | 3 +- .../jdbc/databases/derby/DerbyMetadata.java | 1 - .../connector/jdbc/databases/h2/H2Metadata.java | 3 +- .../h2/xa}/H2XaConnectionWrapper.java | 2 +- .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java | 2 +- .../h2/xa}/H2XaResourceWrapper.java | 2 +- .../{xa/h2 => databases/h2/xa}/package-info.java | 2 +- .../jdbc/databases/mysql/MySqlMetadata.java | 2 - .../jdbc/databases/oracle/OracleMetadata.java | 1 - .../jdbc/databases/postgres/PostgresMetadata.java | 1 - .../databases/sqlserver/SqlServerMetadata.java | 1 - .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 4 +- .../oracle/OracleExactlyOnceSinkE2eTest.java | 4 +- .../postgres/PostgresExactlyOnceSinkE2eTest.java | 4 +- .../connector/jdbc/internal/JdbcFullTest.java | 15 +++--- .../jdbc/internal/JdbcTableOutputFormatTest.java | 12 ++--- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 16 +++---- .../jdbc/table/JdbcDynamicTableSinkITCase.java | 15 ++---- .../connector/jdbc/table/JdbcOutputFormatTest.java | 3 +- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 6 +-- .../connector/jdbc/xa/JdbcXaFacadeImplTest.java | 8 ++-- .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java | 8 ++-- .../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java | 12 ++--- .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 10 ++-- .../jdbc/xa/JdbcXaSinkNoInsertionTest.java | 9 +--- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 16 +++---- .../flink/connector/jdbc/xa/h2/H2DbMetadata.java | 56 ---------------------- 35 files changed, 130 insertions(+), 253 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java deleted file mode 100644 index 55c5317..0000000 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc; - -import javax.sql.XADataSource; - -import java.io.Serializable; - -/** Describes a database: driver, schema and urls. */ -public interface DbMetadata extends Serializable { - - default String getInitUrl() { - return getUrl(); - } - - String getUrl(); - - default String getUser() { - return ""; - } - - default String getPassword() { - return ""; - } - - XADataSource buildXaDataSource(); - - String getDriverClass(); - - default JdbcConnectionOptions toConnectionOptions() { - return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withDriverName(getDriverClass()) - .withUrl(getUrl()) - .build(); - } -} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java index 003cf57..8b8a5aa 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -40,11 +41,11 @@ import static org.mockito.Mockito.doReturn; public abstract class JdbcDataTestBase extends JdbcTestBase { @BeforeEach void initData() throws SQLException { - JdbcTestFixture.initData(getDbMetadata()); + JdbcTestFixture.initData(getMetadata()); } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return DERBY_EBOOKSHOP_DB; } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java index d1a830b..41a8082 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.function.FunctionWithException; @@ -73,8 +74,8 @@ public class JdbcITCase extends JdbcTestBase { String.format(INSERT_TEMPLATE, INPUT_TABLE), TEST_ENTRY_JDBC_STATEMENT_BUILDER, new JdbcConnectionOptionsBuilder() - .withUrl(getDbMetadata().getUrl()) - .withDriverName(getDbMetadata().getDriverClass()) + .withUrl(getMetadata().getUrl()) + .withDriverName(getMetadata().getDriverClass()) .build())); env.execute(); @@ -107,8 +108,8 @@ public class JdbcITCase extends JdbcTestBase { ps.setString(2, e.content); }, new JdbcConnectionOptionsBuilder() - .withUrl(getDbMetadata().getUrl()) - .withDriverName(getDbMetadata().getDriverClass()) + .withUrl(getMetadata().getUrl()) + .withDriverName(getMetadata().getDriverClass()) .build())); env.execute(); @@ -117,7 +118,7 @@ public class JdbcITCase extends JdbcTestBase { private List<String> selectWords() throws SQLException { ArrayList<String> strings = new ArrayList<>(); - try (Connection connection = DriverManager.getConnection(getDbMetadata().getUrl())) { + try (Connection connection = DriverManager.getConnection(getMetadata().getUrl())) { try (Statement st = connection.createStatement()) { try (ResultSet rs = st.executeQuery("select word from words")) { while (rs.next()) { @@ -144,7 +145,7 @@ public class JdbcITCase extends JdbcTestBase { private List<TestEntry> selectBooks() throws SQLException { List<TestEntry> result = new ArrayList<>(); - try (Connection connection = DriverManager.getConnection(getDbMetadata().getUrl())) { + try (Connection connection = DriverManager.getConnection(getMetadata().getUrl())) { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setReadOnly(true); try (Statement st = connection.createStatement()) { @@ -167,7 +168,7 @@ public class JdbcITCase extends JdbcTestBase { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return DERBY_EBOOKSHOP_DB; } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java index 92c0e79..ca6baa2 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.Serializable; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; @@ -192,11 +191,8 @@ class JdbcInputFormatTest extends JdbcDataTestBase { .finish(); jdbcInputFormat.openInputFormat(); - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); final int defaultFetchSize = - DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl()) - .createStatement() - .getFetchSize(); + DERBY_EBOOKSHOP_DB.getConnection().createStatement().getFetchSize(); assertThat(jdbcInputFormat.getStatement().getFetchSize()).isEqualTo(defaultFetchSize); } @@ -229,9 +225,7 @@ class JdbcInputFormatTest extends JdbcDataTestBase { .finish(); jdbcInputFormat.openInputFormat(); - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - final boolean defaultAutoCommit = - DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl()).getAutoCommit(); + final boolean defaultAutoCommit = DERBY_EBOOKSHOP_DB.getConnection().getAutoCommit(); assertThat(jdbcInputFormat.getDbConn().getAutoCommit()).isEqualTo(defaultAutoCommit); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java index 3150a15..f0427c0 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java @@ -62,8 +62,7 @@ class JdbcRowOutputFormatTest extends JdbcDataTestBase { } jdbcOutputFormat = null; - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl()); + try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection(); Statement stat = conn.createStatement()) { stat.execute("DELETE FROM " + OUTPUT_TABLE); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java index 1214a08..ee6b208 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java @@ -17,6 +17,8 @@ package org.apache.flink.connector.jdbc; +import org.apache.flink.connector.jdbc.databases.DatabaseTest; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,18 +26,16 @@ import org.junit.jupiter.api.BeforeEach; * Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses create tables before * each test and drops afterwards. */ -public abstract class JdbcTestBase { +public abstract class JdbcTestBase implements DatabaseTest { @BeforeEach public void before() throws Exception { - JdbcTestFixture.initSchema(getDbMetadata()); + JdbcTestFixture.initSchema(getMetadata()); } @AfterEach public void after() throws Exception { - JdbcTestFixture.cleanupData(getDbMetadata().getUrl()); - JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata()); + JdbcTestFixture.cleanupData(getMetadata().getUrl()); + JdbcTestFixture.cleanUpDatabasesStatic(getMetadata()); } - - protected abstract DbMetadata getDbMetadata(); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java index d00fbad..cab8135 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java @@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata; -import org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; +import org.apache.flink.connector.jdbc.databases.derby.DerbyDatabase; import org.apache.flink.table.types.logical.RowType; import java.io.OutputStream; @@ -36,7 +36,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT /** Test data and helper objects for JDBC tests. */ @SuppressWarnings("SpellCheckingInspection") -public class JdbcTestFixture { +public class JdbcTestFixture implements DerbyDatabase { public static final JdbcTestCheckpoint CP0 = new JdbcTestCheckpoint(0, 1, 2, 3); public static final JdbcTestCheckpoint CP1 = new JdbcTestCheckpoint(1, 4, 5, 6); @@ -74,10 +74,7 @@ public class JdbcTestFixture { new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010) }; - private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop"; - public static final DerbyMetadata DERBY_EBOOKSHOP_DB = - new DerbyMetadata(EBOOKSHOP_SCHEMA_NAME); - public static final H2DbMetadata H2_EBOOKSHOP_DB = new H2DbMetadata(EBOOKSHOP_SCHEMA_NAME); + public static final DatabaseMetadata DERBY_EBOOKSHOP_DB = DerbyDatabase.startDatabase(); /** TestEntry. */ public static class TestEntry implements Serializable { @@ -188,14 +185,8 @@ public class JdbcTestFixture { public void write(int b) {} }; - public static void initSchema(DbMetadata dbMetadata) - throws ClassNotFoundException, SQLException { - System.setProperty( - "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL"); - Class.forName(dbMetadata.getDriverClass()); - try (Connection conn = - DriverManager.getConnection( - dbMetadata.getInitUrl(), dbMetadata.getUser(), dbMetadata.getPassword())) { + public static void initSchema(DatabaseMetadata metadata) throws SQLException { + try (Connection conn = metadata.getConnection()) { createTable(conn, JdbcTestFixture.INPUT_TABLE); createTable(conn, OUTPUT_TABLE); createTable(conn, OUTPUT_TABLE_2); @@ -218,7 +209,7 @@ public class JdbcTestFixture { } } - static void initData(DbMetadata dbMetadata) throws SQLException { + static void initData(DatabaseMetadata dbMetadata) throws SQLException { try (Connection conn = DriverManager.getConnection(dbMetadata.getUrl())) { insertDataIntoInputTable(conn); } @@ -234,10 +225,9 @@ public class JdbcTestFixture { stat.close(); } - public static void cleanUpDatabasesStatic(DbMetadata dbMetadata) - throws ClassNotFoundException, SQLException { - Class.forName(dbMetadata.getDriverClass()); - try (Connection conn = DriverManager.getConnection(dbMetadata.getUrl()); + public static void cleanUpDatabasesStatic(DatabaseMetadata dbMetadata) throws SQLException { + + try (Connection conn = dbMetadata.getConnection(); Statement stat = conn.createStatement()) { stat.executeUpdate("DROP TABLE " + INPUT_TABLE); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java index fcfc656..2d3fbec 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java @@ -1,15 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.jdbc.databases; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.util.FlinkRuntimeException; import javax.sql.XADataSource; import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; -public interface DatabaseMetadata extends Serializable, DbMetadata { +/** Describes a database: driver, schema and urls. */ +public interface DatabaseMetadata extends Serializable { - default String getUrl(){ + default String getUrl() { return getJdbcUrl(); } @@ -29,7 +49,7 @@ public interface DatabaseMetadata extends Serializable, DbMetadata { String getVersion(); - default JdbcConnectionOptions toConnectionOptions() { + default JdbcConnectionOptions getConnectionOptions() { return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName(getDriverClass()) .withUrl(getJdbcUrl()) @@ -37,4 +57,13 @@ public interface DatabaseMetadata extends Serializable, DbMetadata { .withPassword(getPassword()) .build(); } -} \ No newline at end of file + + default Connection getConnection() { + try { + Class.forName(getDriverClass()); + return DriverManager.getConnection(getUrl(), getUser(), getPassword()); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java index f0f6e3a..d2bfae5 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyDatabase.java @@ -32,7 +32,8 @@ public interface DerbyDatabase extends DatabaseTest { "derby.stream.error.field", DerbyDatabase.class.getCanonicalName() + ".DEV_NULL"); Class.forName(metadata.getDriverClass()); - DriverManager.getConnection(String.format("%s;create=true", metadata.getJdbcUrl())).close(); + DriverManager.getConnection(String.format("%s;create=true", metadata.getJdbcUrl())) + .close(); } catch (Exception e) { throw new FlinkRuntimeException(e); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java index f7cc8fb..33907db 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java @@ -66,5 +66,4 @@ public class DerbyMetadata implements DatabaseMetadata { public String getVersion() { return "derby:memory"; } - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java index 027fb53..d94e4d6 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.databases.h2; import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; -import org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper; +import org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper; import javax.sql.XADataSource; @@ -62,5 +62,4 @@ public class H2Metadata implements DatabaseMetadata { public String getVersion() { return "h2:mem"; } - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java similarity index 97% rename from flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java rename to flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java index fd08eb6..50893db 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaConnectionWrapper.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaConnectionWrapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.xa.h2; +package org.apache.flink.connector.jdbc.databases.h2.xa; import javax.sql.ConnectionEventListener; import javax.sql.StatementEventListener; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java similarity index 97% rename from flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java rename to flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java index 9d348d9..25b6c37 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaDsWrapper.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaDsWrapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.xa.h2; +package org.apache.flink.connector.jdbc.databases.h2.xa; import javax.sql.XAConnection; import javax.sql.XADataSource; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java similarity index 99% rename from flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java rename to flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java index 0a8572f..8cc7074 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/H2XaResourceWrapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.xa.h2; +package org.apache.flink.connector.jdbc.databases.h2.xa; import org.apache.flink.util.function.ThrowingRunnable; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java similarity index 94% rename from flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java rename to flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java index fa20661..4e4ad39 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/package-info.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/xa/package-info.java @@ -19,4 +19,4 @@ * This package holds some workarounds for the H2 XA client, plus {@link * org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata}. Used only for testing. */ -package org.apache.flink.connector.jdbc.xa.h2; +package org.apache.flink.connector.jdbc.databases.h2.xa; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java index 4bb3d5a..9125232 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java @@ -84,6 +84,4 @@ public class MySqlMetadata implements DatabaseMetadata { public String getVersion() { return this.version; } - - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java index be2c70f..8708a5e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java @@ -89,5 +89,4 @@ public class OracleMetadata implements DatabaseMetadata { public String getVersion() { return version; } - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java index e3d6c04..5d62b06 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java @@ -84,5 +84,4 @@ public class PostgresMetadata implements DatabaseMetadata { public String getVersion() { return version; } - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java index 5435746..0fa59f4 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java @@ -84,5 +84,4 @@ public class SqlServerMetadata implements DatabaseMetadata { public String getVersion() { return version; } - } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java index d03c928..73751a2 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -1,6 +1,6 @@ package org.apache.flink.connector.jdbc.dialect.mysql; -import org.apache.flink.connector.jdbc.DbMetadata; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata; import org.apache.flink.connector.jdbc.test.DockerImageVersions; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; @@ -44,7 +44,7 @@ public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return new MySqlMetadata(CONTAINER); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java index b3dce62..66e5b2e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java @@ -1,6 +1,6 @@ package org.apache.flink.connector.jdbc.dialect.oracle; -import org.apache.flink.connector.jdbc.DbMetadata; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.databases.oracle.OracleMetadata; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; import org.apache.flink.util.function.SerializableSupplier; @@ -26,7 +26,7 @@ public class OracleExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return new OracleMetadata(CONTAINER); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java index 5001f14..807a2a4 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java @@ -1,6 +1,6 @@ package org.apache.flink.connector.jdbc.dialect.postgres; -import org.apache.flink.connector.jdbc.DbMetadata; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.databases.postgres.PostgresMetadata; import org.apache.flink.connector.jdbc.test.DockerImageVersions; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; @@ -35,7 +35,7 @@ public class PostgresExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return new PostgresMetadata(CONTAINER); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java index e92a9ff..4966c28 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java @@ -80,7 +80,7 @@ class JdbcFullTest extends JdbcDataTestBase { JdbcOutputFormat.builder() .setOptions( JdbcConnectorOptions.builder() - .setDBUrl(getDbMetadata().getUrl()) + .setDBUrl(getMetadata().getUrl()) .setTableName(OUTPUT_TABLE) .build()) .setFieldNames(new String[] {"id", "title", "author", "price", "qty"}) @@ -114,8 +114,8 @@ class JdbcFullTest extends JdbcDataTestBase { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat() - .setDrivername(getDbMetadata().getDriverClass()) - .setDBUrl(getDbMetadata().getUrl()) + .setDrivername(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getUrl()) .setQuery(SELECT_ALL_BOOKS) .setRowTypeInfo(ROW_TYPE_INFO); @@ -138,8 +138,8 @@ class JdbcFullTest extends JdbcDataTestBase { // in PreparedStatement.setObject (see its javadoc for more details) JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withUrl(getDbMetadata().getUrl()) - .withDriverName(getDbMetadata().getDriverClass()) + .withUrl(getMetadata().getUrl()) + .withDriverName(getMetadata().getDriverClass()) .build(); JdbcOutputFormat jdbcOutputFormat = @@ -162,7 +162,7 @@ class JdbcFullTest extends JdbcDataTestBase { source.output(jdbcOutputFormat); environment.execute(); - try (Connection dbConn = DriverManager.getConnection(getDbMetadata().getUrl()); + try (Connection dbConn = DriverManager.getConnection(getMetadata().getUrl()); PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS); ResultSet resultSet = statement.executeQuery()) { int count = 0; @@ -175,8 +175,7 @@ class JdbcFullTest extends JdbcDataTestBase { @AfterEach void clearOutputTable() throws Exception { - Class.forName(getDbMetadata().getDriverClass()); - try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl()); + try (Connection conn = getMetadata().getConnection(); Statement stat = conn.createStatement()) { stat.execute("DELETE FROM " + OUTPUT_TABLE); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java index 938dcd1..4c0cb19 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java @@ -67,7 +67,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase { void testUpsertFormatCloseBeforeOpen() throws Exception { JdbcConnectorOptions options = JdbcConnectorOptions.builder() - .setDBUrl(getDbMetadata().getUrl()) + .setDBUrl(getMetadata().getUrl()) .setTableName(OUTPUT_TABLE) .build(); JdbcDmlOptions dmlOptions = @@ -102,7 +102,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase { new TableJdbcUpsertOutputFormat( new SimpleJdbcConnectionProvider( JdbcConnectorOptions.builder() - .setDBUrl(getDbMetadata().getUrl()) + .setDBUrl(getMetadata().getUrl()) .setTableName(OUTPUT_TABLE) .build()) { @Override @@ -174,7 +174,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase { void testJdbcOutputFormat() throws Exception { JdbcConnectorOptions options = JdbcConnectorOptions.builder() - .setDBUrl(getDbMetadata().getUrl()) + .setDBUrl(getMetadata().getUrl()) .setTableName(OUTPUT_TABLE) .build(); JdbcDmlOptions dmlOptions = @@ -222,7 +222,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase { } private void check(Row[] rows) throws SQLException { - check(rows, getDbMetadata().getUrl(), OUTPUT_TABLE, fieldNames); + check(rows, getMetadata().getUrl(), OUTPUT_TABLE, fieldNames); } public static void check(Row[] rows, String url, String table, String[] fields) @@ -252,8 +252,8 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase { format.close(); } format = null; - Class.forName(getDbMetadata().getDriverClass()); - try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl()); + + try (Connection conn = getMetadata().getConnection(); Statement stat = conn.createStatement()) { stat.execute("DELETE FROM " + OUTPUT_TABLE); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java index 34ad9e1..b83d08c 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java @@ -21,8 +21,8 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestBase; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; @@ -34,7 +34,6 @@ import org.mockito.Mockito; import java.io.IOException; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.Statement; import static org.apache.flink.connector.jdbc.JdbcDataTestBase.toRow; @@ -64,11 +63,10 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase { JdbcOutputFormat.builder() .setOptions( JdbcConnectorOptions.builder() - .setDBUrl(getDbMetadata().getUrl()) + .setDBUrl(getMetadata().getUrl()) .setDialect( JdbcDialectLoader.load( - getDbMetadata() - .getUrl(), + getMetadata().getUrl(), getClass() .getClassLoader())) .setTableName(OUTPUT_TABLE) @@ -96,8 +94,7 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase { } private void alterTable() throws Exception { - Class.forName(getDbMetadata().getDriverClass()); - try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl()); + try (Connection conn = getMetadata().getConnection(); Statement stat = conn.createStatement()) { stat.execute("ALTER TABLE " + OUTPUT_TABLE + " DROP COLUMN " + fieldNames[1]); } @@ -113,15 +110,14 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase { } } format = null; - Class.forName(getDbMetadata().getDriverClass()); - try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl()); + try (Connection conn = getMetadata().getConnection(); Statement stat = conn.createStatement()) { stat.execute("DELETE FROM " + OUTPUT_TABLE); } } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return DERBY_EBOOKSHOP_DB; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java index 82a188d..1b6354d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java @@ -20,7 +20,6 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.jdbc.JdbcTestFixture; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.datastream.DataStream; @@ -52,7 +51,6 @@ import org.junit.jupiter.api.Test; import java.math.BigDecimal; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; @@ -70,7 +68,7 @@ import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSin /** The ITCase for {@link JdbcDynamicTableSink}. */ class JdbcDynamicTableSinkITCase extends AbstractTestBase { - public static final String DB_URL = "jdbc:derby:memory:upsert"; + public static final String DB_URL = DERBY_EBOOKSHOP_DB.getUrl(); public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert"; public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend"; public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch"; @@ -79,12 +77,8 @@ class JdbcDynamicTableSinkITCase extends AbstractTestBase { public static final String USER_TABLE = "USER_TABLE"; @BeforeAll - static void beforeAll() throws ClassNotFoundException, SQLException { - System.setProperty( - "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL"); - - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); + static void beforeAll() throws SQLException { + try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection(); Statement stat = conn.createStatement()) { stat.executeUpdate( "CREATE TABLE " @@ -132,8 +126,7 @@ class JdbcDynamicTableSinkITCase extends AbstractTestBase { @AfterAll static void afterAll() throws Exception { TestValuesTableFactory.clearAllData(); - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DB_URL); + try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection(); Statement stat = conn.createStatement()) { stat.execute("DROP TABLE " + OUTPUT_TABLE1); stat.execute("DROP TABLE " + OUTPUT_TABLE2); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java index 487d69c..35ff2e8 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java @@ -508,8 +508,7 @@ class JdbcOutputFormatTest extends JdbcDataTestBase { @AfterEach void clearOutputTable() throws Exception { - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl()); + try (Connection conn = DERBY_EBOOKSHOP_DB.getConnection(); Statement stat = conn.createStatement()) { stat.execute("DELETE FROM " + OUTPUT_TABLE); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java index 13e7875..35e4ce8 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java @@ -156,9 +156,9 @@ public abstract class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { List<Integer> insertedIds = getInsertedIds( - getDbMetadata().getUrl(), - getDbMetadata().getUser(), - getDbMetadata().getPassword(), + getMetadata().getUrl(), + getMetadata().getUser(), + getMetadata().getPassword(), INPUT_TABLE); List<Integer> expectedIds = IntStream.range(0, elementsPerSource * PARALLELISM) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java index 3ba4aba..970ed24 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java @@ -17,9 +17,9 @@ package org.apache.flink.connector.jdbc.xa; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.junit.jupiter.api.Test; @@ -59,7 +59,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase { @Test void testRecover() throws Exception { - try (XaFacade f = XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) { + try (XaFacade f = XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) { f.open(); assertThat(f.recover()).isEmpty(); f.start(XID); @@ -71,7 +71,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase { } f.endAndPrepare(XID); } - try (XaFacade f = XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) { + try (XaFacade f = XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) { f.open(); Collection<Xid> recovered = f.recover(); recovered.forEach(f::rollback); @@ -99,7 +99,7 @@ class JdbcXaFacadeImplTest extends JdbcTestBase { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return JdbcTestFixture.DERBY_EBOOKSHOP_DB; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java index c642b1f..3d3e9c5 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java @@ -17,8 +17,8 @@ package org.apache.flink.connector.jdbc.xa; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.derby.jdbc.EmbeddedXADataSource; import org.junit.jupiter.api.Test; @@ -163,13 +163,11 @@ class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase { } static EmbeddedXADataSource derbyXaDs() { - EmbeddedXADataSource ds = new EmbeddedXADataSource(); - ds.setDatabaseName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDbName()); - return ds; + return (EmbeddedXADataSource) JdbcTestFixture.DERBY_EBOOKSHOP_DB.buildXaDataSource(); } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return JdbcTestFixture.DERBY_EBOOKSHOP_DB; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java index b9abcba..38a904a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java @@ -17,8 +17,9 @@ package org.apache.flink.connector.jdbc.xa; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.databases.h2.H2XaDatabase; +import org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper; import org.junit.jupiter.api.Test; @@ -28,9 +29,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** * {@link JdbcXaSinkFunction} tests using H2 DB. H2 uses MVCC (so we can e.g. count records while * transaction is not yet committed). But XA support isn't full, so for some scenarios {@link - * org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper wrapper} is used, and for some - Derby. + * H2XaDsWrapper wrapper} is used, and for some - Derby. */ -class JdbcXaSinkH2Test extends JdbcXaSinkTestBase { +class JdbcXaSinkH2Test extends JdbcXaSinkTestBase implements H2XaDatabase { @Test void testIgnoreDuplicatedNotification() throws Exception { @@ -69,9 +70,4 @@ class JdbcXaSinkH2Test extends JdbcXaSinkTestBase { sinkHelper.snapshotState(2); assertThat(xaHelper.countInDb()).isEqualTo(0); } - - @Override - protected DbMetadata getDbMetadata() { - return JdbcTestFixture.H2_EBOOKSHOP_DB; - } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java index 04e3576..dbbf1fa 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java @@ -20,10 +20,10 @@ package org.apache.flink.connector.jdbc.xa; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.connector.jdbc.JdbcTestFixture; import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -91,10 +91,10 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase { try (JdbcXaFacadeTestHelper h = new JdbcXaFacadeTestHelper( JdbcXaSinkDerbyTest.derbyXaDs(), - getDbMetadata().getUrl(), + getMetadata().getUrl(), JdbcTestFixture.INPUT_TABLE, - getDbMetadata().getUser(), - getDbMetadata().getPassword())) { + getMetadata().getUser(), + getMetadata().getPassword())) { h.assertDbContentsEquals(CP0); } } @@ -105,7 +105,7 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase { } @Override - protected DbMetadata getDbMetadata() { + public DatabaseMetadata getMetadata() { return JdbcTestFixture.DERBY_EBOOKSHOP_DB; } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java index f1e4751..a5f1f6b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java @@ -17,8 +17,8 @@ package org.apache.flink.connector.jdbc.xa; -import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.databases.h2.H2XaDatabase; import org.junit.jupiter.api.Test; @@ -26,7 +26,7 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; import static org.assertj.core.api.Assertions.assertThat; /** Tests that data is not inserted ahead of time. */ -class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase { +class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase implements H2XaDatabase { @Test void testNoInsertAfterInvoke() throws Exception { @@ -61,9 +61,4 @@ class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase { } assertThat(xaHelper.countInDb()).isEqualTo(0); } - - @Override - protected DbMetadata getDbMetadata() { - return JdbcTestFixture.H2_EBOOKSHOP_DB; - } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java index 3a688aa..fb0968d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java @@ -83,14 +83,14 @@ abstract class JdbcXaSinkTestBase extends JdbcTestBase { @BeforeEach void initHelpers() throws Exception { - xaDataSource = getDbMetadata().buildXaDataSource(); + xaDataSource = getMetadata().buildXaDataSource(); xaHelper = new JdbcXaFacadeTestHelper( - getDbMetadata().buildXaDataSource(), - getDbMetadata().getUrl(), + getMetadata().buildXaDataSource(), + getMetadata().getUrl(), INPUT_TABLE, - getDbMetadata().getUser(), - getDbMetadata().getPassword()); + getMetadata().getUser(), + getMetadata().getPassword()); sinkHelper = buildSinkHelper(createStateHandler()); } @@ -109,10 +109,10 @@ abstract class JdbcXaSinkTestBase extends JdbcTestBase { try (JdbcXaFacadeTestHelper xa = new JdbcXaFacadeTestHelper( xaDataSource, - getDbMetadata().getUrl(), + getMetadata().getUrl(), INPUT_TABLE, - getDbMetadata().getUser(), - getDbMetadata().getPassword())) { + getMetadata().getUser(), + getMetadata().getPassword())) { xa.cancelAllTx(); } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java deleted file mode 100644 index 36c7fc8..0000000 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2DbMetadata.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.xa.h2; - -import org.apache.flink.connector.jdbc.DbMetadata; - -import javax.sql.XADataSource; - -/** H2DbMetadata. */ -public class H2DbMetadata implements DbMetadata { - - private final String schema; - - public H2DbMetadata(String schema) { - this.schema = schema; - } - - @Override - public XADataSource buildXaDataSource() { - final org.h2.jdbcx.JdbcDataSource ds = new org.h2.jdbcx.JdbcDataSource(); - ds.setUrl(getUrl()); - return new H2XaDsWrapper(ds); - } - - @Override - public String getDriverClass() { - return "org.h2.Driver"; - } - - @Override - public String getUrl() { - return String.format("jdbc:h2:mem:%s;INIT=SET SCHEMA %s", schema, schema); - } - - @Override - public String getInitUrl() { - return String.format( - "jdbc:h2:mem:%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s", - schema, schema, schema); - } -}