This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 0bc576ae52b965d3ef4d56b4a4b85459cbf7fa7c Author: Joao Boto <b...@boto.pro> AuthorDate: Mon Jan 30 17:41:33 2023 +0100 [FLINK-30790] Change MySql tests to use new MySqlDatabase --- .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 194 +-------------------- .../jdbc/table/UnsignedTypeConversionITCase.java | 37 +--- 2 files changed, 7 insertions(+), 224 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java index 7100c2b..81d4690 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -1,47 +1,19 @@ package org.apache.flink.connector.jdbc.dialect.mysql; -import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; -import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.SerializableSupplier; import com.mysql.cj.jdbc.MysqlXADataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import javax.sql.XADataSource; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -import static org.apache.flink.util.Preconditions.checkArgument; - /** * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for issues with errors on * closing connections. */ -@Testcontainers -public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { - - @Container - private static final MySqlXaContainer CONTAINER = - new MySqlXaContainer(DockerImageVersions.MYSQL) - .withLockWaitTimeout( - (CHECKPOINT_TIMEOUT_MS + TASK_CANCELLATION_TIMEOUT_MS) * 2); - - @Override - public DatabaseMetadata getMetadata() { - return new MySqlMetadata(CONTAINER); - } +public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest + implements MySqlDatabase { @Override public SerializableSupplier<XADataSource> getDataSourceSupplier() { @@ -53,164 +25,4 @@ public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { return xaDataSource; }; } - - /** {@link MySQLContainer} with XA enabled. */ - static class MySqlXaContainer extends MySQLContainer<MySqlXaContainer> { - private long lockWaitTimeout = 0; - private volatile InnoDbStatusLogger innoDbStatusLogger; - - public MySqlXaContainer(String dockerImageName) { - super(DockerImageName.parse(dockerImageName)); - } - - public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) { - checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be greater than 0"); - this.lockWaitTimeout = lockWaitTimeout; - return this.self(); - } - - @Override - public void start() { - super.start(); - // prevent XAER_RMERR: Fatal error occurred in the transaction branch - check your - // data for consistency works for mysql v8+ - try (Connection connection = - DriverManager.getConnection(getJdbcUrl(), "root", getPassword())) { - prepareDb(connection, lockWaitTimeout); - } catch (SQLException e) { - ExceptionUtils.rethrow(e); - } - - this.innoDbStatusLogger = - new InnoDbStatusLogger( - getJdbcUrl(), "root", getPassword(), lockWaitTimeout / 2); - innoDbStatusLogger.start(); - } - - @Override - public void stop() { - try { - innoDbStatusLogger.stop(); - } catch (Exception e) { - ExceptionUtils.rethrow(e); - } finally { - super.stop(); - } - } - - private void prepareDb(Connection connection, long lockWaitTimeout) throws SQLException { - try (Statement st = connection.createStatement()) { - st.execute("GRANT XA_RECOVER_ADMIN ON *.* TO '" + getUsername() + "'@'%'"); - st.execute("FLUSH PRIVILEGES"); - // if the reason of task cancellation failure is waiting for a lock - // then failing transactions with a relevant message would ease debugging - st.execute("SET GLOBAL innodb_lock_wait_timeout = " + lockWaitTimeout); - // st.execute("SET GLOBAL innodb_status_output = ON"); - // st.execute("SET GLOBAL innodb_status_output_locks = ON"); - } - } - } - - /** InnoDB status logger. */ - static class InnoDbStatusLogger { - private static final Logger LOG = LoggerFactory.getLogger(InnoDbStatusLogger.class); - private final Thread thread; - private volatile boolean running; - - private InnoDbStatusLogger(String url, String user, String password, long intervalMs) { - running = true; - thread = - new Thread( - () -> { - LOG.info("Logging InnoDB status every {}ms", intervalMs); - try (Connection connection = - DriverManager.getConnection(url, user, password)) { - while (running) { - Thread.sleep(intervalMs); - queryAndLog(connection); - } - } catch (Exception e) { - LOG.warn("failed", e); - } finally { - LOG.info("Logging InnoDB status stopped"); - } - }); - } - - public void start() { - thread.start(); - } - - public void stop() throws InterruptedException { - running = false; - thread.join(); - } - - private void queryAndLog(Connection connection) throws SQLException { - try (Statement st = connection.createStatement()) { - showBlockedTrx(st); - showAllTrx(st); - showEngineStatus(st); - showRecoveredTrx(st); - // additional query: show full processlist \G; -- only shows live - } - } - - private void showRecoveredTrx(Statement st) throws SQLException { - try (ResultSet rs = st.executeQuery("xa recover convert xid ")) { - while (rs.next()) { - LOG.debug( - "recovered trx: {} {} {} {}", - rs.getString(1), - rs.getString(2), - rs.getString(3), - rs.getString(4)); - } - } - } - - private void showEngineStatus(Statement st) throws SQLException { - LOG.debug("Engine status"); - try (ResultSet rs = st.executeQuery("show engine innodb status")) { - while (rs.next()) { - LOG.debug(rs.getString(3)); - } - } - } - - private void showAllTrx(Statement st) throws SQLException { - LOG.debug("All TRX"); - try (ResultSet rs = st.executeQuery("select * from information_schema.innodb_trx")) { - while (rs.next()) { - LOG.debug( - "trx_id: {}, trx_state: {}, trx_started: {}, trx_requested_lock_id: {}, trx_wait_started: {}, trx_mysql_thread_id: {},", - rs.getString("trx_id"), - rs.getString("trx_state"), - rs.getString("trx_started"), - rs.getString("trx_requested_lock_id"), - rs.getString("trx_wait_started"), - rs.getString("trx_mysql_thread_id") /* 0 for recovered*/); - } - } - } - - private void showBlockedTrx(Statement st) throws SQLException { - LOG.debug("Blocked TRX"); - try (ResultSet rs = - st.executeQuery( - " SELECT waiting_trx_id, waiting_pid, waiting_query, blocking_trx_id, blocking_pid, blocking_query " - + "FROM sys.innodb_lock_waits; ")) { - while (rs.next()) { - LOG.debug( - "waiting_trx_id: {}, waiting_pid: {}, waiting_query: {}, blocking_trx_id: {}, blocking_pid: {}, blocking_query: {}", - rs.getString(1), - rs.getString(2), - rs.getString(3), - rs.getString(4), - rs.getString(5), - rs.getString(6)); - } - } - } - } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java index af03459..d236d57 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.table; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; @@ -32,22 +33,14 @@ import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.math.BigDecimal; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static java.lang.String.format; import static java.lang.String.join; @@ -57,17 +50,12 @@ import static org.assertj.core.api.Assertions.assertThat; * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL * to mock a DB. */ -@Testcontainers -class UnsignedTypeConversionITCase extends AbstractTestBase { +class UnsignedTypeConversionITCase extends AbstractTestBase implements MySqlDatabase { private static final Logger LOGGER = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class); - private static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34"); - private static final String DEFAULT_DB_NAME = "test"; private static final String TABLE_NAME = "unsigned_test"; - private static final String USER = "root"; - private static final String PASSWORD = ""; private static final List<String> COLUMNS = Arrays.asList( "tiny_c", @@ -79,13 +67,6 @@ class UnsignedTypeConversionITCase extends AbstractTestBase { "big_c", "big_un_c"); - private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP = - new HashMap<String, String>() { - { - put("MYSQL_ROOT_HOST", "%"); - } - }; - private static final Object[] ROW = new Object[] { (byte) 127, @@ -98,19 +79,9 @@ class UnsignedTypeConversionITCase extends AbstractTestBase { new BigDecimal("18446744073709551615") }; - @Container - static final MySQLContainer<?> MYSQL_CONTAINER = - new MySQLContainer<>(MYSQL_57_IMAGE) - .withEnv(DEFAULT_CONTAINER_ENV_MAP) - .withUsername(USER) - .withPassword(PASSWORD) - .withDatabaseName(DEFAULT_DB_NAME) - .withLogConsumer(new Slf4jLogConsumer(LOGGER)); - @Test void testUnsignedType() throws Exception { - try (Connection con = - DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), USER, PASSWORD)) { + try (Connection con = getMetadata().getConnection()) { StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(sEnv); createMysqlTable(con); @@ -173,7 +144,7 @@ class UnsignedTypeConversionITCase extends AbstractTestBase { + "big_un_c DECIMAL(20, 0)) with(" + " 'connector' = 'jdbc'," + " 'url' = '" - + format("%s?user=%s&password=&", MYSQL_CONTAINER.getJdbcUrl(), USER) + + getMetadata().getJdbcUrlWithCredentials() + "'," + " 'table-name' = '" + TABLE_NAME