snuyanzin commented on code in PR #27840:
URL: https://github.com/apache/flink/pull/27840#discussion_r3032090116
##########
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java:
##########
@@ -193,6 +195,156 @@ public void testCatalogSchemas() throws Exception {
}
}
+ @Test
+ public void testGetSchemasWithFilter() throws Exception {
+ DriverUri driverUri = getDriverUri();
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+ Executor executor = connection.getExecutor();
+ executeDDL("CREATE DATABASE database_a", executor);
+ executeDDL("CREATE DATABASE database_b", executor);
+
+ DatabaseMetaData metaData =
+ new FlinkDatabaseMetaData(
+ driverUri.getURL(), connection, new
TestingStatement());
+
+ // Filter by catalog
+ List<String> schemas =
+
resultSetToListAndClose(metaData.getSchemas("default_catalog", null));
+ assertThat(schemas).allMatch(s -> s.contains("default_catalog"));
+
+ // Filter by schema pattern
+ List<String> filteredSchemas =
+
resultSetToListAndClose(metaData.getSchemas("default_catalog", "database_%"));
+ assertThat(filteredSchemas)
+ .allMatch(s -> s.startsWith("database_a,") ||
s.startsWith("database_b,"));
+ }
+ }
+
+ @Test
+ public void testGetTables() throws Exception {
+ DriverUri driverUri = getDriverUri();
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+ Executor executor = connection.getExecutor();
+ executeDDL(
+ "CREATE TABLE test_table1 (id INT, name STRING) WITH
('connector' = 'datagen')",
+ executor);
+ executeDDL(
+ "CREATE TABLE test_table2 (id INT, val DOUBLE) WITH
('connector' = 'datagen')",
+ executor);
+ executeDDL("CREATE VIEW test_view1 AS SELECT id, name FROM
test_table1", executor);
+
+ DatabaseMetaData metaData =
+ new FlinkDatabaseMetaData(
+ driverUri.getURL(), connection, new
TestingStatement());
+
+ // Get all tables and views
+ ResultSet rs = metaData.getTables("default_catalog",
"default_database", null, null);
+ List<String> allResults = resultSetToListAndClose(rs);
+ Set<String> tableNames = new HashSet<>();
+ for (String row : allResults) {
+ tableNames.add(row.split(",")[2]);
+ }
+ assertThat(tableNames).contains("test_table1", "test_table2",
"test_view1");
+
+ // Filter by type TABLE only
+ rs =
+ metaData.getTables(
+ "default_catalog", "default_database", null, new
String[] {"TABLE"});
+ List<String> tableOnly = resultSetToListAndClose(rs);
+ for (String row : tableOnly) {
+ assertThat(row).contains("TABLE");
+ }
+ Set<String> tableOnlyNames = new HashSet<>();
+ for (String row : tableOnly) {
+ tableOnlyNames.add(row.split(",")[2]);
+ }
+ assertThat(tableOnlyNames).contains("test_table1", "test_table2");
+ assertThat(tableOnlyNames).doesNotContain("test_view1");
+
+ // Filter by name pattern
+ rs = metaData.getTables("default_catalog", "default_database",
"test_table%", null);
+ List<String> patternResults = resultSetToListAndClose(rs);
+ for (String row : patternResults) {
+ assertThat(row.split(",")[2]).startsWith("test_table");
+ }
+ }
+ }
+
+ @Test
+ public void testGetColumns() throws Exception {
+ DriverUri driverUri = getDriverUri();
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+ Executor executor = connection.getExecutor();
+ executeDDL(
+ "CREATE TABLE col_test (id INT NOT NULL, name STRING,
score DOUBLE) WITH ('connector' = 'datagen')",
+ executor);
+
+ DatabaseMetaData metaData =
+ new FlinkDatabaseMetaData(
+ driverUri.getURL(), connection, new
TestingStatement());
+
+ ResultSet rs =
+ metaData.getColumns("default_catalog", "default_database",
"col_test", null);
+
+ // Verify column count and names using COLUMN_NAME (column 4)
+ List<String> columnNames = new ArrayList<>();
+ while (rs.next()) {
+ columnNames.add(rs.getString("COLUMN_NAME"));
+ }
+ rs.close();
+ assertThat(columnNames).containsExactly("id", "name", "score");
+
+ // Filter by column name pattern
+ rs = metaData.getColumns("default_catalog", "default_database",
"col_test", "na%");
+ List<String> filtered = new ArrayList<>();
+ while (rs.next()) {
+ filtered.add(rs.getString("COLUMN_NAME"));
+ }
+ rs.close();
+ assertThat(filtered).containsExactly("name");
+ }
+ }
+
+ @Test
+ public void testGetTableTypes() throws Exception {
+ DriverUri driverUri = getDriverUri();
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+ DatabaseMetaData metaData =
+ new FlinkDatabaseMetaData(
+ driverUri.getURL(), connection, new
TestingStatement());
+
+ List<String> types =
resultSetToListAndClose(metaData.getTableTypes());
+ assertThat(types).containsExactly("TABLE", "VIEW");
Review Comment:
are we sure about order or should use `inAnyOrder`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]