This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit a4e200fb839b8ad7b82ab7aea7f62415d14d4a4e Author: Joao Boto <b...@boto.pro> AuthorDate: Mon Jan 30 17:22:05 2023 +0100 [FLINK-30790] Change Oracle tests to use new OracleDatabase --- .../connector/jdbc/databases/DatabaseMetadata.java | 2 + .../jdbc/databases/derby/DerbyMetadata.java | 5 ++ .../connector/jdbc/databases/h2/H2Metadata.java | 5 ++ .../jdbc/databases/mysql/MySqlMetadata.java | 5 ++ .../jdbc/databases/oracle/OracleDatabase.java | 9 +- .../jdbc/databases/oracle/OracleMetadata.java | 5 ++ .../jdbc/databases/postgres/PostgresMetadata.java | 5 ++ .../databases/sqlserver/SqlServerMetadata.java | 5 ++ .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 5 -- .../jdbc/dialect/oracle/OracleContainer.java | 99 ---------------------- .../oracle/OracleExactlyOnceSinkE2eTest.java | 25 ++---- .../jdbc/dialect/oracle/OracleTableSinkITCase.java | 66 +++++++++------ .../dialect/oracle/OracleTableSourceITCase.java | 33 +++++--- .../postgres/PostgresExactlyOnceSinkE2eTest.java | 5 -- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 6 +- 15 files changed, 109 insertions(+), 171 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java index 2d3fbec..3eeed1d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java @@ -39,6 +39,8 @@ public interface DatabaseMetadata extends Serializable { String getJdbcUrl(); + String getJdbcUrlWithCredentials(); + String getUsername(); String getPassword(); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java index 33907db..960a56c 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java @@ -40,6 +40,11 @@ public class DerbyMetadata implements DatabaseMetadata { return String.format("jdbc:derby:%s", dbName); } + @Override + public String getJdbcUrlWithCredentials() { + return getJdbcUrl(); + } + @Override public String getUsername() { return ""; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java index d94e4d6..95db4c6 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java @@ -36,6 +36,11 @@ public class H2Metadata implements DatabaseMetadata { return String.format("jdbc:h2:mem:%s", schema); } + @Override + public String getJdbcUrlWithCredentials() { + return getJdbcUrl(); + } + @Override public String getUsername() { return ""; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java index 9125232..58956ad 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java @@ -52,6 +52,11 @@ public class MySqlMetadata implements DatabaseMetadata { return this.url; } + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s?user=%s&password=%s", this.url, this.username, this.password); + } + @Override public String getUsername() { return this.username; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java index d800e38..70e503e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java @@ -19,9 +19,10 @@ package org.apache.flink.connector.jdbc.databases.oracle; import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.databases.DatabaseTest; -import org.apache.flink.connector.jdbc.dialect.oracle.OracleContainer; +import org.apache.flink.connector.jdbc.test.DockerImageVersions; import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.OracleContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -29,7 +30,11 @@ import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers public interface OracleDatabase extends DatabaseTest { - @Container JdbcDatabaseContainer<?> CONTAINER = new OracleContainer(); + @Container + JdbcDatabaseContainer<?> CONTAINER = + new OracleContainer(DockerImageVersions.ORACLE) + .withStartupTimeoutSeconds(240) + .withConnectTimeoutSeconds(120); @Override default DatabaseMetadata getMetadata() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java index 8708a5e..144e165 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleMetadata.java @@ -54,6 +54,11 @@ public class OracleMetadata implements DatabaseMetadata { return this.url; } + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s?user=%s&password=%s", this.url, this.username, this.password); + } + @Override public String getUsername() { return this.username; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java index 5d62b06..562984e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/PostgresMetadata.java @@ -52,6 +52,11 @@ public class PostgresMetadata implements DatabaseMetadata { return this.url; } + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s&user=%s&password=%s", this.url, this.username, this.password); + } + @Override public String getUsername() { return this.username; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java index 0fa59f4..edda8e4 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/SqlServerMetadata.java @@ -52,6 +52,11 @@ public class SqlServerMetadata implements DatabaseMetadata { return this.url; } + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s;username=%s;password=%s", getUrl(), getUser(), getPassword()); + } + @Override public String getUsername() { return this.username; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java index 73751a2..7100c2b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -38,11 +38,6 @@ public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { .withLockWaitTimeout( (CHECKPOINT_TIMEOUT_MS + TASK_CANCELLATION_TIMEOUT_MS) * 2); - @Override - protected String getDockerVersion() { - return CONTAINER.getDockerImageName(); - } - @Override public DatabaseMetadata getMetadata() { return new MySqlMetadata(CONTAINER); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleContainer.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleContainer.java deleted file mode 100644 index b5c63f7..0000000 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleContainer.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.dialect.oracle; - -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.utility.DockerImageName; - -/** {@link OracleContainer}. */ -public class OracleContainer extends JdbcDatabaseContainer<OracleContainer> { - private static final String DEFAULT_TAG = "18.4.0-slim"; - private static final String IMAGE = "gvenzl/oracle-xe"; - private static final DockerImageName ORACLE_IMAGE = - DockerImageName.parse(IMAGE).withTag(DEFAULT_TAG); - - private static final int ORACLE_PORT = 1521; - private static final int APEX_HTTP_PORT = 8080; - - private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; - private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; - - private String username = "system"; - private String password = "oracle"; - - public OracleContainer() { - super(ORACLE_IMAGE); - preconfigure(); - } - - private void preconfigure() { - withStartupTimeoutSeconds(DEFAULT_STARTUP_TIMEOUT_SECONDS); - withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS); - withEnv("ORACLE_PASSWORD", password); - addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT); - } - - @Override - public String getDriverClassName() { - return "oracle.jdbc.OracleDriver"; - } - - @Override - public String getJdbcUrl() { - return "jdbc:oracle:thin:" - + getUsername() - + "/" - + getPassword() - + "@" - + getHost() - + ":" - + getOraclePort() - + ":" - + getSid(); - } - - @Override - public String getUsername() { - return username; - } - - @Override - public String getPassword() { - return password; - } - - @SuppressWarnings("SameReturnValue") - public String getSid() { - return "xe"; - } - - public Integer getOraclePort() { - return getMappedPort(ORACLE_PORT); - } - - @SuppressWarnings("unused") - public Integer getWebPort() { - return getMappedPort(APEX_HTTP_PORT); - } - - @Override - public String getTestQueryString() { - return "SELECT 1 FROM DUAL"; - } -} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java index 66e5b2e..7f9318c 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java @@ -1,34 +1,21 @@ package org.apache.flink.connector.jdbc.dialect.oracle; -import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; -import org.apache.flink.connector.jdbc.databases.oracle.OracleMetadata; +import org.apache.flink.connector.jdbc.databases.oracle.OracleDatabase; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; import org.apache.flink.util.function.SerializableSupplier; import oracle.jdbc.xa.client.OracleXADataSource; -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import javax.sql.XADataSource; import java.sql.SQLException; /** A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. */ -@Testcontainers -public class OracleExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { - - @Container private static final JdbcDatabaseContainer<?> CONTAINER = new OracleContainer(); - - @Override - protected String getDockerVersion() { - return CONTAINER.getDockerImageName(); - } - - @Override - public DatabaseMetadata getMetadata() { - return new OracleMetadata(CONTAINER); - } +@DisabledOnOs(OS.MAC) +public class OracleExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest + implements OracleDatabase { @Override public SerializableSupplier<XADataSource> getDataSourceSupplier() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java index 8c9da92..a6e1574 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.dialect.oracle; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.databases.oracle.OracleDatabase; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.datastream.DataStream; @@ -48,6 +49,8 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import java.math.BigDecimal; import java.sql.Connection; @@ -66,10 +69,8 @@ import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; /** The Table Sink ITCase for {@link OracleDialect}. */ -class OracleTableSinkITCase extends AbstractTestBase { - - private static final OracleContainer container = new OracleContainer(); - private static String containerUrl; +@DisabledOnOs(OS.MAC) +class OracleTableSinkITCase extends AbstractTestBase implements OracleDatabase { public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert"; public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend"; @@ -80,10 +81,12 @@ class OracleTableSinkITCase extends AbstractTestBase { @BeforeAll static void beforeAll() throws ClassNotFoundException, SQLException { - container.start(); - containerUrl = container.getJdbcUrl(); - Class.forName(container.getDriverClassName()); - try (Connection conn = DriverManager.getConnection(containerUrl); + Class.forName(CONTAINER.getDriverClassName()); + try (Connection conn = + DriverManager.getConnection( + CONTAINER.getJdbcUrl(), + CONTAINER.getUsername(), + CONTAINER.getPassword()); Statement stat = conn.createStatement()) { stat.executeUpdate( "CREATE TABLE " @@ -131,8 +134,12 @@ class OracleTableSinkITCase extends AbstractTestBase { @AfterAll static void afterAll() throws Exception { TestValuesTableFactory.clearAllData(); - Class.forName(container.getDriverClassName()); - try (Connection conn = DriverManager.getConnection(containerUrl); + Class.forName(CONTAINER.getDriverClassName()); + try (Connection conn = + DriverManager.getConnection( + CONTAINER.getJdbcUrl(), + CONTAINER.getUsername(), + CONTAINER.getPassword()); Statement stat = conn.createStatement()) { stat.execute("DROP TABLE " + OUTPUT_TABLE1); stat.execute("DROP TABLE " + OUTPUT_TABLE2); @@ -141,7 +148,6 @@ class OracleTableSinkITCase extends AbstractTestBase { stat.execute("DROP TABLE " + OUTPUT_TABLE5); stat.execute("DROP TABLE " + USER_TABLE); } - container.stop(); } public static DataStream<Tuple4<Integer, Long, String, Timestamp>> get4TupleDataStream( @@ -193,7 +199,7 @@ class OracleTableSinkITCase extends AbstractTestBase { + ") WITH (" + " 'connector'='jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name'='" + OUTPUT_TABLE4 @@ -201,7 +207,11 @@ class OracleTableSinkITCase extends AbstractTestBase { + ")"); tEnv.executeSql("INSERT INTO upsertSink SELECT CAST(1.1 as FLOAT)").await(); - check(new Row[] {Row.of(1.1f)}, containerUrl, "REAL_TABLE", new String[] {"real_data"}); + check( + new Row[] {Row.of(1.1f)}, + getMetadata().getJdbcUrlWithCredentials(), + "REAL_TABLE", + new String[] {"real_data"}); } @Test @@ -239,7 +249,7 @@ class OracleTableSinkITCase extends AbstractTestBase { + ") WITH (" + " 'connector'='jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name'='" + OUTPUT_TABLE1 @@ -265,7 +275,7 @@ class OracleTableSinkITCase extends AbstractTestBase { Row.of(7, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.021")), Row.of(9, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.015")) }, - containerUrl, + getMetadata().getJdbcUrlWithCredentials(), OUTPUT_TABLE1, new String[] {"cnt", "lencnt", "cTag", "ts"}); } @@ -291,7 +301,7 @@ class OracleTableSinkITCase extends AbstractTestBase { + ") WITH (" + " 'connector'='jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name'='" + OUTPUT_TABLE2 @@ -306,7 +316,7 @@ class OracleTableSinkITCase extends AbstractTestBase { Row.of(10, 4, Timestamp.valueOf("1970-01-01 00:00:00.01")), Row.of(20, 6, Timestamp.valueOf("1970-01-01 00:00:00.02")) }, - containerUrl, + getMetadata().getJdbcUrlWithCredentials(), OUTPUT_TABLE2, new String[] {"id", "num", "ts"}); } @@ -322,7 +332,7 @@ class OracleTableSinkITCase extends AbstractTestBase { + ") WITH ( " + "'connector' = 'jdbc'," + "'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + "'table-name' = '" + OUTPUT_TABLE3 @@ -349,7 +359,7 @@ class OracleTableSinkITCase extends AbstractTestBase { Row.of("Kim", 42), Row.of("Bob", 1) }, - containerUrl, + getMetadata().getJdbcUrlWithCredentials(), OUTPUT_TABLE3, new String[] {"NAME", "SCORE"}); } @@ -382,7 +392,7 @@ class OracleTableSinkITCase extends AbstractTestBase { + ") WITH (\n" + " 'connector' = 'jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name' = '" + USER_TABLE @@ -414,7 +424,7 @@ class OracleTableSinkITCase extends AbstractTestBase { new BigDecimal("11.3"), new BigDecimal("22.6")) }, - containerUrl, + getMetadata().getJdbcUrlWithCredentials(), USER_TABLE, new String[] {"user_id", "user_name", "email", "balance", "balance2"}); } @@ -423,7 +433,7 @@ class OracleTableSinkITCase extends AbstractTestBase { void testFlushBufferWhenCheckpoint() throws Exception { Map<String, String> options = new HashMap<>(); options.put("connector", "jdbc"); - options.put("url", containerUrl); + options.put("url", getMetadata().getJdbcUrlWithCredentials()); options.put("table-name", OUTPUT_TABLE5); options.put("sink.buffer-flush.interval", "0"); @@ -442,9 +452,17 @@ class OracleTableSinkITCase extends AbstractTestBase { sinkFunction.invoke(GenericRowData.of(1L), SinkContextUtil.forTimestamp(1)); sinkFunction.invoke(GenericRowData.of(2L), SinkContextUtil.forTimestamp(1)); - check(new Row[] {}, containerUrl, OUTPUT_TABLE5, new String[] {"id"}); + check( + new Row[] {}, + getMetadata().getJdbcUrlWithCredentials(), + OUTPUT_TABLE5, + new String[] {"id"}); sinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); - check(new Row[] {Row.of(1L), Row.of(2L)}, containerUrl, OUTPUT_TABLE5, new String[] {"id"}); + check( + new Row[] {Row.of(1L), Row.of(2L)}, + getMetadata().getJdbcUrlWithCredentials(), + OUTPUT_TABLE5, + new String[] {"id"}); sinkFunction.close(); } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java index eb340c0..eb07f6b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.dialect.oracle; +import org.apache.flink.connector.jdbc.databases.oracle.OracleDatabase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -29,6 +30,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import java.sql.Connection; import java.sql.DriverManager; @@ -44,10 +47,9 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; /** The Table Source ITCase for {@link OracleDialect}. */ -class OracleTableSourceITCase extends AbstractTestBase { +@DisabledOnOs(OS.MAC) +class OracleTableSourceITCase extends AbstractTestBase implements OracleDatabase { - private static final OracleContainer container = new OracleContainer(); - private static String containerUrl; private static final String INPUT_TABLE = "oracle_test_table"; private static StreamExecutionEnvironment env; @@ -55,10 +57,12 @@ class OracleTableSourceITCase extends AbstractTestBase { @BeforeAll static void beforeAll() throws ClassNotFoundException, SQLException { - container.start(); - containerUrl = container.getJdbcUrl(); - Class.forName(container.getDriverClassName()); - try (Connection conn = DriverManager.getConnection(containerUrl); + Class.forName(CONTAINER.getDriverClassName()); + try (Connection conn = + DriverManager.getConnection( + CONTAINER.getJdbcUrl(), + CONTAINER.getUsername(), + CONTAINER.getPassword()); Statement statement = conn.createStatement()) { statement.executeUpdate( "CREATE TABLE " @@ -98,12 +102,15 @@ class OracleTableSourceITCase extends AbstractTestBase { @AfterAll static void afterAll() throws Exception { - Class.forName(container.getDriverClassName()); - try (Connection conn = DriverManager.getConnection(containerUrl); + Class.forName(CONTAINER.getDriverClassName()); + try (Connection conn = + DriverManager.getConnection( + CONTAINER.getJdbcUrl(), + CONTAINER.getUsername(), + CONTAINER.getPassword()); Statement statement = conn.createStatement()) { statement.executeUpdate("DROP TABLE " + INPUT_TABLE); } - container.stop(); } @BeforeEach @@ -135,7 +142,7 @@ class OracleTableSourceITCase extends AbstractTestBase { + ") WITH (" + " 'connector'='jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name'='" + INPUT_TABLE @@ -172,7 +179,7 @@ class OracleTableSourceITCase extends AbstractTestBase { + ") WITH (" + " 'connector'='jdbc'," + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name'='" + INPUT_TABLE @@ -215,7 +222,7 @@ class OracleTableSourceITCase extends AbstractTestBase { + ") WITH (\n" + " 'connector'='jdbc',\n" + " 'url'='" - + containerUrl + + getMetadata().getJdbcUrlWithCredentials() + "',\n" + " 'table-name'='" + INPUT_TABLE diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java index 807a2a4..daa173c 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java @@ -29,11 +29,6 @@ public class PostgresExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { .withMaxConnections(PARALLELISM * 2) .withMaxTransactions(50); - @Override - protected String getDockerVersion() { - return CONTAINER.getDockerImageName(); - } - @Override public DatabaseMetadata getMetadata() { return new PostgresMetadata(CONTAINER); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java index 35e4ce8..d229bec 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java @@ -84,8 +84,6 @@ public abstract class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { protected abstract SerializableSupplier<XADataSource> getDataSourceSupplier(); - protected abstract String getDockerVersion(); - @RegisterExtension static final MiniClusterExtension MINI_CLUSTER = createCluster(); private static MiniClusterExtension createCluster() { @@ -124,7 +122,7 @@ public abstract class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { @Test void testInsert() throws Exception { long started = System.currentTimeMillis(); - LOG.info("Test insert for {}", getDockerVersion()); + LOG.info("Test insert for {}", getMetadata().getVersion()); int elementsPerSource = 50; int numElementsPerCheckpoint = 7; int minElementsPerFailure = numElementsPerCheckpoint / 3; @@ -169,7 +167,7 @@ public abstract class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { .containsExactlyInAnyOrderElementsOf(expectedIds); LOG.info( "Test insert for {} finished in {} ms.", - getDockerVersion(), + getMetadata().getVersion(), System.currentTimeMillis() - started); }