snuyanzin commented on code in PR #18: URL: https://github.com/apache/flink-connector-jdbc/pull/18#discussion_r1098436923
########## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java: ########## @@ -111,39 +126,338 @@ class MySqlCatalogTestBase { .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid")) .build(); - public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>(); - public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>(); - - @BeforeAll - static void beforeAll() throws SQLException { - for (String dockerImageName : DOCKER_IMAGE_NAMES) { - MySQLContainer<?> container = - new MySQLContainer<>(DockerImageName.parse(dockerImageName)) - .withUsername("root") - .withPassword("") - .withEnv(DEFAULT_CONTAINER_ENV_MAP) - .withInitScript(MYSQL_INIT_SCRIPT) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - container.start(); - MYSQL_CONTAINERS.put(dockerImageName, container); - CATALOGS.put( - dockerImageName, - new MySqlCatalog( - Thread.currentThread().getContextClassLoader(), - TEST_CATALOG_NAME, - TEST_DB, - TEST_USERNAME, - TEST_PWD, - container - .getJdbcUrl() - .substring(0, container.getJdbcUrl().lastIndexOf("/")))); - } - } - - @AfterAll - static void cleanup() { - for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) { - container.stop(); - } + protected static final List<Row> TABLE_ROWS = + Lists.newArrayList( + Row.ofKind( + RowKind.INSERT, + 1L, + -1L, + new BigDecimal(1), + null, + true, + null, + "hello", + Date.valueOf("2021-08-04").toLocalDate(), + Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(), + new BigDecimal(-1), + new BigDecimal(1), + -1.0d, + 1.0d, + "enum2", + -9.1f, + 9.1f, + -1, + 1L, + -1, + 1L, + null, + "col_longtext", + null, + -1, + 1, + "col_mediumtext", + new BigDecimal(-99), + new BigDecimal(99), + -1.0d, + 1.0d, + "set_ele1", + Short.parseShort("-1"), + 1, + "col_text", + Time.valueOf("10:32:34").toLocalTime(), + Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(), + "col_tinytext", + Byte.parseByte("-1"), + Short.parseShort("1"), + null, + "col_varchar", + Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(), + Time.valueOf("09:33:43").toLocalTime(), + Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(), + null), + Row.ofKind( + RowKind.INSERT, + 2L, + -1L, + new BigDecimal(1), + null, + true, + null, + "hello", + Date.valueOf("2021-08-04").toLocalDate(), + Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(), + new BigDecimal(-1), + new BigDecimal(1), + -1.0d, + 1.0d, + "enum2", + -9.1f, + 9.1f, + -1, + 1L, + -1, + 1L, + null, + "col_longtext", + null, + -1, + 1, + "col_mediumtext", + new BigDecimal(-99), + new BigDecimal(99), + -1.0d, + 1.0d, + "set_ele1,set_ele12", + Short.parseShort("-1"), + 1, + "col_text", + Time.valueOf("10:32:34").toLocalTime(), + Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(), + "col_tinytext", + Byte.parseByte("-1"), + Short.parseShort("1"), + null, + "col_varchar", + Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(), + Time.valueOf("09:33:43").toLocalTime(), + Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(), + null)); + + private MySqlCatalog catalog; + private TableEnvironment tEnv; + + protected static MySQLContainer<?> createContainer(String dockerImage) { + return new MySQLContainer<>(DockerImageName.parse(dockerImage)) + .withUsername("root") + .withPassword("") + .withEnv(DEFAULT_CONTAINER_ENV_MAP) + .withInitScript(MYSQL_INIT_SCRIPT) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + protected abstract String getDatabaseUrl(); + + @BeforeEach + void setup() { + catalog = + new MySqlCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + TEST_DB, + TEST_USERNAME, + TEST_PWD, + getDatabaseUrl()); + + this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + // Use mysql catalog. + tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tEnv.useCatalog(TEST_CATALOG_NAME); + } + + @Test + void testGetDb_DatabaseNotExistException() throws Exception { + String databaseNotExist = "nonexistent"; + assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist)) + .satisfies( + anyCauseMatches( + DatabaseNotExistException.class, + String.format( + "Database %s does not exist in Catalog", + databaseNotExist))); + } + + @Test + void testListDatabases() { + List<String> actual = catalog.listDatabases(); + assertThat(actual).containsExactly(TEST_DB, TEST_DB2); + } + + @Test + void testDbExists() throws Exception { + String databaseNotExist = "nonexistent"; + assertThat(catalog.databaseExists(databaseNotExist)).isFalse(); + assertThat(catalog.databaseExists(TEST_DB)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List<String> actual = catalog.listTables(TEST_DB); + assertThat(actual) + .isEqualTo( + Arrays.asList( + TEST_TABLE_ALL_TYPES, + TEST_SINK_TABLE_ALL_TYPES, + TEST_TABLE_SINK_FROM_GROUPED_BY, + TEST_TABLE_PK)); + } + + @Test + void testListTables_DatabaseNotExistException() throws DatabaseNotExistException { Review Comment: ```suggestion void testListTables_DatabaseNotExistException() { ``` exception from signature could be removed since it's handled in `assertThatThrownBy` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org