wuchong commented on a change in pull request #13244:
URL: https://github.com/apache/flink/pull/13244#discussion_r478448798



##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -60,27 +60,54 @@
        private static final String TABLE_NAME = "unsigned_test";
 
        private StreamTableEnvironment tEnv;
+       private DB db;
        private String dbUrl;
        private Connection connection;
 
-       @ClassRule
-       public static MariaDB4jRule db4jRule = new MariaDB4jRule(
-               DBConfigurationBuilder.newBuilder().build(),
-               DEFAULT_DB_NAME,
-               null);
-
        @Before
-       public void setUp() throws SQLException, ClassNotFoundException {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               tEnv = StreamTableEnvironment.create(env);
+       public void setUp() throws SQLException, IllegalStateException {
                //dbUrl: jdbc:mysql://localhost:3306/test
-               dbUrl = db4jRule.getURL();
-               connection = DriverManager.getConnection(dbUrl);
+               prepareMariaDB();
                createMysqlTable();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               tEnv = StreamTableEnvironment.create(env);
                createFlinkTable();
                prepareData();
        }
 
+       private void prepareMariaDB() throws IllegalStateException {
+               boolean initDbSuccess = false;
+               int i = 0;
+               //retry
+               while (i < 3) {
+                       try {
+                               db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
+                               db.start();
+                               dbUrl = 
db.getConfiguration().getURL(DEFAULT_DB_NAME);
+                               connection = DriverManager.getConnection(dbUrl);
+                               try (Statement statement = 
connection.createStatement()) {
+                                       statement.execute("CREATE DATABASE IF 
NOT EXISTS `" + DEFAULT_DB_NAME + "`;");
+                                       ResultSet resultSet = 
statement.executeQuery("SELECT SCHEMA_NAME FROM " +
+                                               "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '" + DEFAULT_DB_NAME + "';");
+                                       if (resultSet.next()) {
+                                               String dbName = 
resultSet.getString(1);
+                                               initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
+                                       }
+                               }
+                       } catch (Exception e) {
+                               log.info("Initialize DB fail caused by {}", e);

Review comment:
       log.warn?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -60,27 +60,54 @@
        private static final String TABLE_NAME = "unsigned_test";
 
        private StreamTableEnvironment tEnv;
+       private DB db;
        private String dbUrl;
        private Connection connection;
 
-       @ClassRule
-       public static MariaDB4jRule db4jRule = new MariaDB4jRule(
-               DBConfigurationBuilder.newBuilder().build(),
-               DEFAULT_DB_NAME,
-               null);
-
        @Before
-       public void setUp() throws SQLException, ClassNotFoundException {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               tEnv = StreamTableEnvironment.create(env);
+       public void setUp() throws SQLException, IllegalStateException {
                //dbUrl: jdbc:mysql://localhost:3306/test
-               dbUrl = db4jRule.getURL();
-               connection = DriverManager.getConnection(dbUrl);
+               prepareMariaDB();
                createMysqlTable();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               tEnv = StreamTableEnvironment.create(env);
                createFlinkTable();
                prepareData();
        }
 
+       private void prepareMariaDB() throws IllegalStateException {
+               boolean initDbSuccess = false;
+               int i = 0;
+               //retry

Review comment:
       Please add more comments for why we add a retry here, we can also link 
the issue. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -60,27 +60,54 @@
        private static final String TABLE_NAME = "unsigned_test";
 
        private StreamTableEnvironment tEnv;
+       private DB db;
        private String dbUrl;
        private Connection connection;
 
-       @ClassRule
-       public static MariaDB4jRule db4jRule = new MariaDB4jRule(
-               DBConfigurationBuilder.newBuilder().build(),
-               DEFAULT_DB_NAME,
-               null);
-
        @Before
-       public void setUp() throws SQLException, ClassNotFoundException {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               tEnv = StreamTableEnvironment.create(env);
+       public void setUp() throws SQLException, IllegalStateException {
                //dbUrl: jdbc:mysql://localhost:3306/test
-               dbUrl = db4jRule.getURL();
-               connection = DriverManager.getConnection(dbUrl);
+               prepareMariaDB();

Review comment:
       I would suggest to prepare DB in a `@ClassBefore` instead of `@Before` 
to avoid repeated initialization between tests. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -60,27 +60,54 @@
        private static final String TABLE_NAME = "unsigned_test";
 
        private StreamTableEnvironment tEnv;
+       private DB db;
        private String dbUrl;
        private Connection connection;
 
-       @ClassRule
-       public static MariaDB4jRule db4jRule = new MariaDB4jRule(
-               DBConfigurationBuilder.newBuilder().build(),
-               DEFAULT_DB_NAME,
-               null);
-
        @Before
-       public void setUp() throws SQLException, ClassNotFoundException {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               tEnv = StreamTableEnvironment.create(env);
+       public void setUp() throws SQLException, IllegalStateException {
                //dbUrl: jdbc:mysql://localhost:3306/test
-               dbUrl = db4jRule.getURL();
-               connection = DriverManager.getConnection(dbUrl);
+               prepareMariaDB();
                createMysqlTable();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               tEnv = StreamTableEnvironment.create(env);
                createFlinkTable();
                prepareData();
        }
 
+       private void prepareMariaDB() throws IllegalStateException {
+               boolean initDbSuccess = false;
+               int i = 0;
+               //retry
+               while (i < 3) {
+                       try {
+                               db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
+                               db.start();
+                               dbUrl = 
db.getConfiguration().getURL(DEFAULT_DB_NAME);
+                               connection = DriverManager.getConnection(dbUrl);
+                               try (Statement statement = 
connection.createStatement()) {
+                                       statement.execute("CREATE DATABASE IF 
NOT EXISTS `" + DEFAULT_DB_NAME + "`;");
+                                       ResultSet resultSet = 
statement.executeQuery("SELECT SCHEMA_NAME FROM " +
+                                               "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '" + DEFAULT_DB_NAME + "';");
+                                       if (resultSet.next()) {
+                                               String dbName = 
resultSet.getString(1);
+                                               initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
+                                       }
+                               }
+                       } catch (Exception e) {
+                               log.info("Initialize DB fail caused by {}", e);

Review comment:
       stop the DB to release potential resources if errors happen?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to