This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 82417ab9a5a [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint 82417ab9a5a is described below commit 82417ab9a5ad6935a73f9b7d1bc6bd4f47ce4f61 Author: yuzelin <747884...@qq.com> AuthorDate: Thu Aug 4 21:08:14 2022 +0800 [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint This closes #20401 --- .../table/endpoint/hive/HiveServer2Endpoint.java | 38 ++++-- .../table/endpoint/hive/HiveServer2Schemas.java | 12 ++ .../hive/util/OperationExecutorFactory.java | 143 +++++++++++++++++++++ .../endpoint/hive/util/StringRowDataUtils.java | 37 ------ .../hive/util/ThriftObjectConversions.java | 2 + .../endpoint/hive/HiveServer2EndpointITCase.java | 107 +++++++++++++-- .../flink/table/gateway/api/SqlGatewayService.java | 21 ++- .../table/gateway/api/operation/OperationType.java | 3 + .../gateway/api/utils/MockedSqlGatewayService.java | 11 ++ .../gateway/service/SqlGatewayServiceImpl.java | 15 +++ .../service/operation/OperationExecutor.java | 20 +++ .../gateway/service/SqlGatewayServiceITCase.java | 44 +++++++ 12 files changed, 391 insertions(+), 62 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index 0772a4e0b24..f27168a4eb1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.endpoint.hive.util.StringRowDataUtils; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint; @@ -113,7 +112,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -121,9 +119,10 @@ import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10; -import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize; +import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor; +import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle; @@ -133,7 +132,6 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions. import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema; -import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -401,14 +399,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin service.submitOperation( sessionHandle, OperationType.LIST_CATALOGS, - () -> { - Set<String> catalogNames = service.listCatalogs(sessionHandle); - return new ResultSet( - EOS, - null, - GET_CATALOGS_SCHEMA, - StringRowDataUtils.toRowData(catalogNames)); - }); + createGetCatalogsExecutor(service, sessionHandle)); resp.setStatus(OK_STATUS); resp.setOperationHandle( toTOperationHandle( @@ -422,7 +413,28 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + TGetSchemasResp resp = new TGetSchemasResp(); + try { + SessionHandle sessionHandle = toSessionHandle(tGetSchemasReq.getSessionHandle()); + String catalogName = tGetSchemasReq.getCatalogName(); + OperationHandle operationHandle = + service.submitOperation( + sessionHandle, + OperationType.LIST_SCHEMAS, + createGetSchemasExecutor( + service, + sessionHandle, + catalogName, + tGetSchemasReq.getSchemaName())); + + resp.setStatus(OK_STATUS); + resp.setOperationHandle( + toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS)); + } catch (Throwable t) { + LOG.error("Failed to GetSchemas.", t); + resp.setStatus(toTStatus(t)); + } + return resp; } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java index 0a6818330fd..e3df486f3e0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import java.util.Arrays; import java.util.Collections; /** Schemas for the HiveServer2 Endpoint result. */ @@ -35,4 +36,15 @@ public class HiveServer2Schemas { .withComment("Catalog name. NULL if not applicable.")), Collections.emptyList(), null); + + /** Schema for {@link HiveServer2Endpoint#GetSchemas}. */ + public static final ResolvedSchema GET_SCHEMAS_SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("TABLE_SCHEMA", DataTypes.STRING()) + .withComment("Schema name. NULL if not applicable."), + Column.physical("TABLE_CAT", DataTypes.STRING()) + .withComment("Catalog name. NULL if not applicable")), + Collections.emptyList(), + null); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java new file mode 100644 index 00000000000..8e90c0d6972 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.endpoint.hive.util; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA; +import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA; +import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS; + +/** Factory to create the operation executor. */ +public class OperationExecutorFactory { + + public static Callable<ResultSet> createGetCatalogsExecutor( + SqlGatewayService service, SessionHandle sessionHandle) { + return () -> executeGetCatalogs(service, sessionHandle); + } + + public static Callable<ResultSet> createGetSchemasExecutor( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName) { + return () -> executeGetSchemas(service, sessionHandle, catalogName, schemaName); + } + + // -------------------------------------------------------------------------------------------- + // Executors + // -------------------------------------------------------------------------------------------- + + private static ResultSet executeGetCatalogs( + SqlGatewayService service, SessionHandle sessionHandle) { + Set<String> catalogNames = service.listCatalogs(sessionHandle); + return new ResultSet( + EOS, + null, + GET_CATALOGS_SCHEMA, + catalogNames.stream() + .map(OperationExecutorFactory::wrap) + .collect(Collectors.toList())); + } + + private static ResultSet executeGetSchemas( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName) { + String specifiedCatalogName = + catalogName == null || catalogName.equals("") + ? service.getCurrentCatalog(sessionHandle) + : catalogName; + Set<String> databaseNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + return new ResultSet( + EOS, + null, + GET_SCHEMAS_SCHEMA, + databaseNames.stream() + .map(name -> wrap(name, specifiedCatalogName)) + .collect(Collectors.toList())); + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private static Set<String> filter(Set<String> candidates, String pattern) { + Pattern compiledPattern = convertNamePattern(pattern); + return candidates.stream() + .filter(name -> compiledPattern.matcher(name).matches()) + .collect(Collectors.toSet()); + } + + /** + * Covert SQL 'like' pattern to a Java regular expression. Underscores (_) are converted to '.' + * and percent signs (%) are converted to '.*'. Note: escape characters are removed. + * + * @param pattern the SQL pattern to convert. + * @return the equivalent Java regular expression of the pattern. + */ + private static Pattern convertNamePattern(@Nullable String pattern) { + if ((pattern == null) || pattern.isEmpty()) { + pattern = "%"; + } + String wStr = ".*"; + return Pattern.compile( + pattern.replaceAll("([^\\\\])%", "$1" + wStr) + .replaceAll("\\\\%", "%") + .replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.") + .replaceAll("\\\\_", "_") + .replaceAll("^_", ".")); + } + + private static GenericRowData wrap(Object... elements) { + Object[] pack = new Object[elements.length]; + for (int i = 0; i < elements.length; i++) { + Object element = elements[i]; + if (element != null) { + if (element instanceof String) { + pack[i] = StringData.fromString((String) element); + } else if (element instanceof Integer) { + pack[i] = element; + } else if (element instanceof Short) { + pack[i] = element; + } else { + throw new UnsupportedOperationException( + String.format( + "Can not wrap the element %s at index %s into RowData.", + element, i)); + } + } + } + return GenericRowData.of(pack); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java deleted file mode 100644 index 103f7311c89..00000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.endpoint.hive.util; - -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; - -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -/** Convert the {@link String} to {@link RowData}. */ -public class StringRowDataUtils { - - public static List<RowData> toRowData(Collection<String> external) { - return external.stream() - .map(val -> GenericRowData.of(StringData.fromString(val))) - .collect(Collectors.toList()); - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java index 235796e4eb3..9e2b1cc73ed 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java @@ -162,6 +162,8 @@ public class ThriftObjectConversions { return TOperationType.EXECUTE_STATEMENT; case LIST_CATALOGS: return TOperationType.GET_CATALOGS; + case LIST_SCHEMAS: + return TOperationType.GET_SCHEMAS; case UNKNOWN: return TOperationType.UNKNOWN; default: diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java index 7b567e184c0..94d392b63c5 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java @@ -39,6 +39,7 @@ import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; @@ -63,9 +64,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.net.InetAddress; import java.sql.Connection; import java.sql.ResultSetMetaData; +import java.sql.Statement; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -196,18 +201,85 @@ public class HiveServer2EndpointITCase extends TestLogger { @Test public void testGetCatalogs() throws Exception { - try (Connection connection = ENDPOINT_EXTENSION.getConnection()) { - java.sql.ResultSet result = connection.getMetaData().getCatalogs(); - assertSchemaEquals( - ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())), - result.getMetaData()); - - List<String> actual = new ArrayList<>(); - while (result.next()) { - actual.add(result.getString(1)); - } + runGetObjectTest( + connection -> connection.getMetaData().getCatalogs(), + ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())), + Arrays.asList( + Collections.singletonList("hive"), + Collections.singletonList("default_catalog"))); + } + + @Test + public void testGetSchemas() throws Exception { + runGetObjectTest( + connection -> connection.getMetaData().getSchemas("default_catalog", null), + ResolvedSchema.of( + Column.physical("TABLE_SCHEMA", DataTypes.STRING()), + Column.physical("TABLE_CAT", DataTypes.STRING())), + Arrays.asList( + Arrays.asList("default_database", "default_catalog"), + Arrays.asList("db_test1", "default_catalog"), + Arrays.asList("db_test2", "default_catalog"), + Arrays.asList("db_diff", "default_catalog"))); + } + + @Test + public void testGetSchemasWithPattern() throws Exception { + runGetObjectTest( + connection -> connection.getMetaData().getSchemas(null, "db\\_test%"), + ResolvedSchema.of( + Column.physical("TABLE_SCHEMA", DataTypes.STRING()), + Column.physical("TABLE_CAT", DataTypes.STRING())), + Arrays.asList( + Arrays.asList("db_test1", "default_catalog"), + Arrays.asList("db_test2", "default_catalog"))); + } + + // -------------------------------------------------------------------------------------------- + + private Connection getInitializedConnection() throws Exception { + Connection connection = ENDPOINT_EXTENSION.getConnection(); + Statement statement = connection.createStatement(); + statement.execute("SET table.sql-dialect=default"); + statement.execute("USE CATALOG `default_catalog`"); + + // default_catalog: db_test1 | db_test2 | db_diff | default + // db_test1: temporary table tb_1, table tb_2, temporary view tb_3, view tb_4 + // db_test2: table tb_1, table diff_1, view tb_2, view diff_2 + // db_diff: table tb_1, view tb_2 + + statement.execute("CREATE DATABASE db_test1"); + statement.execute("CREATE DATABASE db_test2"); + statement.execute("CREATE DATABASE db_diff"); - assertThat(actual).contains("hive", "default_catalog"); + statement.execute("CREATE TEMPORARY TABLE db_test1.tb_1 COMMENT 'temporary table tb_1'"); + statement.execute("CREATE TABLE db_test1.tb_2 COMMENT 'table tb_2'"); + statement.execute( + "CREATE TEMPORARY VIEW db_test1.tb_3 COMMENT 'temporary view tb_3' AS SELECT 1"); + statement.execute("CREATE VIEW db_test1.tb_4 COMMENT 'view tb_4' AS SELECT 1"); + + statement.execute("CREATE TABLE db_test2.tb_1 COMMENT 'table tb_1'"); + statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table diff_1'"); + statement.execute("CREATE VIEW db_test2.tb_2 COMMENT 'view tb_2' AS SELECT 1"); + statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view diff_2' AS SELECT 1"); + + statement.execute("CREATE TABLE db_diff.tb_1 COMMENT 'table tb_1'"); + statement.execute("CREATE VIEW db_diff.tb_2 COMMENT 'view tb_2' AS SELECT 1"); + + statement.close(); + return connection; + } + + private void runGetObjectTest( + FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier, + ResolvedSchema expectedSchema, + List<List<Object>> expectedResults) + throws Exception { + try (Connection connection = getInitializedConnection(); + java.sql.ResultSet result = resultSetSupplier.apply(connection)) { + assertSchemaEquals(expectedSchema, result.getMetaData()); + assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount()))) + .isEqualTo(new HashSet<>(expectedResults)); } } @@ -265,4 +337,17 @@ public class HiveServer2EndpointITCase extends TestLogger { assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType); } } + + private List<List<Object>> collect(java.sql.ResultSet result, int columnCount) + throws Exception { + List<List<Object>> actual = new ArrayList<>(); + while (result.next()) { + List<Object> row = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + row.add(result.getObject(i)); + } + actual.add(row); + } + return actual; + } } diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index 97b377c59c4..5e77577fc02 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -185,12 +185,21 @@ public interface SqlGatewayService { SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, - int maxRows); + int maxRows) + throws SqlGatewayException; // ------------------------------------------------------------------------------------------- // Catalog API // ------------------------------------------------------------------------------------------- + /** + * Return current catalog name. + * + * @param sessionHandle handle to identify the session. + * @return name of the current catalog. + */ + String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException; + /** * Return all available catalogs in the current session. * @@ -198,4 +207,14 @@ public interface SqlGatewayService { * @return names of the registered catalogs. */ Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException; + + /** + * Return all available schemas in the given catalog. + * + * @param sessionHandle handle to identify the session. + * @param catalogName name string of the given catalog. + * @return names of the registered schemas. + */ + Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) + throws SqlGatewayException; } diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java index 6177008b264..9366b6e58fb 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java @@ -29,6 +29,9 @@ public enum OperationType { /** The type indicates the operation list catalogs. */ LIST_CATALOGS, + /** The type indicates the operation list schemas. */ + LIST_SCHEMAS, + /** The type indicates the operation is unknown. */ UNKNOWN; } diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java index af0ff36ec6e..e07debec55b 100644 --- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java @@ -120,4 +120,15 @@ public class MockedSqlGatewayService implements SqlGatewayService { public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException { throw new UnsupportedOperationException(); } + + @Override + public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } + + @Override + public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException { + throw new UnsupportedOperationException(); + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index 9419b6071c3..39f437914b8 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -221,6 +221,21 @@ public class SqlGatewayServiceImpl implements SqlGatewayService { } } + @Override + public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) { + try { + return getSession(sessionHandle).createExecutor().listDatabases(catalogName); + } catch (Throwable t) { + LOG.error("Failed to listDatabases.", t); + throw new SqlGatewayException("Failed to listDatabases.", t); + } + } + + @Override + public String getCurrentCatalog(SessionHandle sessionHandle) { + return getSession(sessionHandle).createExecutor().getCurrentCatalog(); + } + @VisibleForTesting Session getSession(SessionHandle sessionHandle) { return sessionManager.getSession(sessionHandle); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index e49a91f2ae8..a42ed341f24 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.CatalogNotExistException; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.TableResultInternal; @@ -43,6 +44,7 @@ import org.apache.flink.table.operations.command.SetOperation; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -100,10 +102,28 @@ public class OperationExecutor { } } + public String getCurrentCatalog() { + return getTableEnvironment().getCatalogManager().getCurrentCatalog(); + } + public Set<String> listCatalogs() { return getTableEnvironment().getCatalogManager().listCatalogs(); } + public Set<String> listDatabases(String catalogName) { + return new HashSet<>( + getTableEnvironment() + .getCatalogManager() + .getCatalog(catalogName) + .orElseThrow( + () -> + new CatalogNotExistException( + String.format( + "Catalog '%s' does not exist.", + catalogName))) + .listDatabases()); + } + // -------------------------------------------------------------------------------------------- @VisibleForTesting diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 69880a74dec..3481ac930c2 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -304,6 +304,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema())); } + // -------------------------------------------------------------------------------------------- + // Catalog API tests + // -------------------------------------------------------------------------------------------- + @Test public void testListCatalogs() { SessionEnvironment environment = @@ -316,6 +320,46 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { assertThat(service.listCatalogs(sessionHandle)).contains("cat1", "cat2"); } + @Test + public void testListDatabases() throws Exception { + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .registerCatalog("cat", new GenericInMemoryCatalog("cat")) + .setDefaultCatalog("cat") + .build(); + SessionHandle sessionHandle = service.openSession(environment); + Configuration configuration = + Configuration.fromMap(service.getSessionConfig(sessionHandle)); + + service.executeStatement(sessionHandle, "CREATE DATABASE db1", -1, configuration); + OperationHandle operationHandle = + service.executeStatement(sessionHandle, "CREATE DATABASE db2", -1, configuration); + + CommonTestUtils.waitUtil( + () -> + service.getOperationInfo(sessionHandle, operationHandle) + .getStatus() + .isTerminalStatus(), + Duration.ofSeconds(100), + "Failed to wait operation finish."); + assertThat(service.listDatabases(sessionHandle, "cat")).contains("db1", "db2"); + } + + @Test + public void testGetCurrentCatalog() { + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) + .registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) + .setDefaultCatalog("cat2") + .build(); + SessionHandle sessionHandle = service.openSession(environment); + + assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2"); + } + // -------------------------------------------------------------------------------------------- // Concurrent tests // --------------------------------------------------------------------------------------------