This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 0bc576ae52b965d3ef4d56b4a4b85459cbf7fa7c
Author: Joao Boto <b...@boto.pro>
AuthorDate: Mon Jan 30 17:41:33 2023 +0100

    [FLINK-30790] Change MySql tests to use new MySqlDatabase
---
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 194 +--------------------
 .../jdbc/table/UnsignedTypeConversionITCase.java   |  37 +---
 2 files changed, 7 insertions(+), 224 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
index 7100c2b..81d4690 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
@@ -1,47 +1,19 @@
 package org.apache.flink.connector.jdbc.dialect.mysql;
 
-import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
-import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata;
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import com.mysql.cj.jdbc.MysqlXADataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.MySQLContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
 
 import javax.sql.XADataSource;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for 
issues with errors on
  * closing connections.
  */
-@Testcontainers
-public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest {
-
-    @Container
-    private static final MySqlXaContainer CONTAINER =
-            new MySqlXaContainer(DockerImageVersions.MYSQL)
-                    .withLockWaitTimeout(
-                            (CHECKPOINT_TIMEOUT_MS + 
TASK_CANCELLATION_TIMEOUT_MS) * 2);
-
-    @Override
-    public DatabaseMetadata getMetadata() {
-        return new MySqlMetadata(CONTAINER);
-    }
+public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest
+        implements MySqlDatabase {
 
     @Override
     public SerializableSupplier<XADataSource> getDataSourceSupplier() {
@@ -53,164 +25,4 @@ public class MySqlExactlyOnceSinkE2eTest extends 
JdbcExactlyOnceSinkE2eTest {
             return xaDataSource;
         };
     }
-
-    /** {@link MySQLContainer} with XA enabled. */
-    static class MySqlXaContainer extends MySQLContainer<MySqlXaContainer> {
-        private long lockWaitTimeout = 0;
-        private volatile InnoDbStatusLogger innoDbStatusLogger;
-
-        public MySqlXaContainer(String dockerImageName) {
-            super(DockerImageName.parse(dockerImageName));
-        }
-
-        public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) {
-            checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be 
greater than 0");
-            this.lockWaitTimeout = lockWaitTimeout;
-            return this.self();
-        }
-
-        @Override
-        public void start() {
-            super.start();
-            // prevent XAER_RMERR: Fatal error occurred in the transaction  
branch - check your
-            // data for consistency works for mysql v8+
-            try (Connection connection =
-                    DriverManager.getConnection(getJdbcUrl(), "root", 
getPassword())) {
-                prepareDb(connection, lockWaitTimeout);
-            } catch (SQLException e) {
-                ExceptionUtils.rethrow(e);
-            }
-
-            this.innoDbStatusLogger =
-                    new InnoDbStatusLogger(
-                            getJdbcUrl(), "root", getPassword(), 
lockWaitTimeout / 2);
-            innoDbStatusLogger.start();
-        }
-
-        @Override
-        public void stop() {
-            try {
-                innoDbStatusLogger.stop();
-            } catch (Exception e) {
-                ExceptionUtils.rethrow(e);
-            } finally {
-                super.stop();
-            }
-        }
-
-        private void prepareDb(Connection connection, long lockWaitTimeout) 
throws SQLException {
-            try (Statement st = connection.createStatement()) {
-                st.execute("GRANT XA_RECOVER_ADMIN ON *.* TO '" + 
getUsername() + "'@'%'");
-                st.execute("FLUSH PRIVILEGES");
-                // if the reason of task cancellation failure is waiting for a 
lock
-                // then failing transactions with a relevant message would 
ease debugging
-                st.execute("SET GLOBAL innodb_lock_wait_timeout = " + 
lockWaitTimeout);
-                // st.execute("SET GLOBAL innodb_status_output = ON");
-                // st.execute("SET GLOBAL innodb_status_output_locks = ON");
-            }
-        }
-    }
-
-    /** InnoDB status logger. */
-    static class InnoDbStatusLogger {
-        private static final Logger LOG = 
LoggerFactory.getLogger(InnoDbStatusLogger.class);
-        private final Thread thread;
-        private volatile boolean running;
-
-        private InnoDbStatusLogger(String url, String user, String password, 
long intervalMs) {
-            running = true;
-            thread =
-                    new Thread(
-                            () -> {
-                                LOG.info("Logging InnoDB status every {}ms", 
intervalMs);
-                                try (Connection connection =
-                                        DriverManager.getConnection(url, user, 
password)) {
-                                    while (running) {
-                                        Thread.sleep(intervalMs);
-                                        queryAndLog(connection);
-                                    }
-                                } catch (Exception e) {
-                                    LOG.warn("failed", e);
-                                } finally {
-                                    LOG.info("Logging InnoDB status stopped");
-                                }
-                            });
-        }
-
-        public void start() {
-            thread.start();
-        }
-
-        public void stop() throws InterruptedException {
-            running = false;
-            thread.join();
-        }
-
-        private void queryAndLog(Connection connection) throws SQLException {
-            try (Statement st = connection.createStatement()) {
-                showBlockedTrx(st);
-                showAllTrx(st);
-                showEngineStatus(st);
-                showRecoveredTrx(st);
-                // additional query: show full processlist \G; -- only shows 
live
-            }
-        }
-
-        private void showRecoveredTrx(Statement st) throws SQLException {
-            try (ResultSet rs = st.executeQuery("xa recover convert xid ")) {
-                while (rs.next()) {
-                    LOG.debug(
-                            "recovered trx: {} {} {} {}",
-                            rs.getString(1),
-                            rs.getString(2),
-                            rs.getString(3),
-                            rs.getString(4));
-                }
-            }
-        }
-
-        private void showEngineStatus(Statement st) throws SQLException {
-            LOG.debug("Engine status");
-            try (ResultSet rs = st.executeQuery("show engine innodb status")) {
-                while (rs.next()) {
-                    LOG.debug(rs.getString(3));
-                }
-            }
-        }
-
-        private void showAllTrx(Statement st) throws SQLException {
-            LOG.debug("All TRX");
-            try (ResultSet rs = st.executeQuery("select * from 
information_schema.innodb_trx")) {
-                while (rs.next()) {
-                    LOG.debug(
-                            "trx_id: {}, trx_state: {}, trx_started: {}, 
trx_requested_lock_id: {}, trx_wait_started: {}, trx_mysql_thread_id: {},",
-                            rs.getString("trx_id"),
-                            rs.getString("trx_state"),
-                            rs.getString("trx_started"),
-                            rs.getString("trx_requested_lock_id"),
-                            rs.getString("trx_wait_started"),
-                            rs.getString("trx_mysql_thread_id") /* 0 for 
recovered*/);
-                }
-            }
-        }
-
-        private void showBlockedTrx(Statement st) throws SQLException {
-            LOG.debug("Blocked TRX");
-            try (ResultSet rs =
-                    st.executeQuery(
-                            " SELECT waiting_trx_id, waiting_pid, 
waiting_query, blocking_trx_id, blocking_pid, blocking_query "
-                                    + "FROM sys.innodb_lock_waits; ")) {
-                while (rs.next()) {
-                    LOG.debug(
-                            "waiting_trx_id: {}, waiting_pid: {}, 
waiting_query: {}, blocking_trx_id: {}, blocking_pid: {}, blocking_query: {}",
-                            rs.getString(1),
-                            rs.getString(2),
-                            rs.getString(3),
-                            rs.getString(4),
-                            rs.getString(5),
-                            rs.getString(6));
-                }
-            }
-        }
-    }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
index af03459..d236d57 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.table;
 
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
@@ -32,22 +33,14 @@ import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.MySQLContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static java.lang.String.format;
 import static java.lang.String.join;
@@ -57,17 +50,12 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
  * to mock a DB.
  */
-@Testcontainers
-class UnsignedTypeConversionITCase extends AbstractTestBase {
+class UnsignedTypeConversionITCase extends AbstractTestBase implements 
MySqlDatabase {
 
     private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
 
-    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
-    private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final String USER = "root";
-    private static final String PASSWORD = "";
     private static final List<String> COLUMNS =
             Arrays.asList(
                     "tiny_c",
@@ -79,13 +67,6 @@ class UnsignedTypeConversionITCase extends AbstractTestBase {
                     "big_c",
                     "big_un_c");
 
-    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
-            new HashMap<String, String>() {
-                {
-                    put("MYSQL_ROOT_HOST", "%");
-                }
-            };
-
     private static final Object[] ROW =
             new Object[] {
                 (byte) 127,
@@ -98,19 +79,9 @@ class UnsignedTypeConversionITCase extends AbstractTestBase {
                 new BigDecimal("18446744073709551615")
             };
 
-    @Container
-    static final MySQLContainer<?> MYSQL_CONTAINER =
-            new MySQLContainer<>(MYSQL_57_IMAGE)
-                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
-                    .withUsername(USER)
-                    .withPassword(PASSWORD)
-                    .withDatabaseName(DEFAULT_DB_NAME)
-                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
-
     @Test
     void testUnsignedType() throws Exception {
-        try (Connection con =
-                DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
USER, PASSWORD)) {
+        try (Connection con = getMetadata().getConnection()) {
             StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
             TableEnvironment tableEnv = StreamTableEnvironment.create(sEnv);
             createMysqlTable(con);
@@ -173,7 +144,7 @@ class UnsignedTypeConversionITCase extends AbstractTestBase 
{
                         + "big_un_c DECIMAL(20, 0)) with("
                         + " 'connector' = 'jdbc',"
                         + " 'url' = '"
-                        + format("%s?user=%s&password=&", 
MYSQL_CONTAINER.getJdbcUrl(), USER)
+                        + getMetadata().getJdbcUrlWithCredentials()
                         + "',"
                         + " 'table-name' = '"
                         + TABLE_NAME

Reply via email to