yuzelin commented on code in PR #20714: URL: https://github.com/apache/flink/pull/20714#discussion_r962537416
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java: ########## @@ -235,40 +236,79 @@ public static SessionContext create( final ModuleManager moduleManager = new ModuleManager(); - final EnvironmentSettings settings = - EnvironmentSettings.newInstance().withConfiguration(configuration).build(); - - CatalogManager catalogManager = - CatalogManager.newBuilder() - // Currently, the classloader is only used by DataTypeFactory. - .classLoader(userClassLoader) - .config(configuration) - .defaultCatalog( - settings.getBuiltInCatalogName(), - new GenericInMemoryCatalog( - settings.getBuiltInCatalogName(), - settings.getBuiltInDatabaseName())) - .build(); + final CatalogManager catalogManager = + buildCatalogManager(configuration, userClassLoader, environment); final FunctionCatalog functionCatalog = new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager); SessionState sessionState = new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog); - return new SessionContext( - defaultContext, - sessionId, - endpointVersion, - configuration, - userClassLoader, - sessionState, - new OperationManager(operationExecutorService)); + // -------------------------------------------------------------------------------------------------------------- + // Build session context and return + // -------------------------------------------------------------------------------------------------------------- + + SessionContext sessionContext = + new SessionContext( + defaultContext, + sessionId, + environment.getSessionEndpointVersion(), + configuration, + userClassLoader, + sessionState, + new OperationManager(operationExecutorService)); + + // filter the default catalog out to avoid exception caused by repeated registration + String currentCatalog = sessionContext.sessionState.catalogManager.getCurrentCatalog(); + environment + .getRegisteredCatalogs() + .forEach( + (catalogName, catalog) -> { + if (!catalogName.equals(currentCatalog)) { + sessionContext.registerCatalog(catalogName, catalog); + } + }); + + environment.getRegisteredModules().forEach(sessionContext::registerModuleAtHead); + + return sessionContext; } // ------------------------------------------------------------------------------------------------------------------ // Helpers // ------------------------------------------------------------------------------------------------------------------ + private static CatalogManager buildCatalogManager( + Configuration configuration, + URLClassLoader userClassLoader, + SessionEnvironment environment) { + CatalogManager.Builder builder = + CatalogManager.newBuilder() + // Currently, the classloader is only used by DataTypeFactory. + .classLoader(userClassLoader) + .config(configuration); + + // init default catalog + String defaultCatalogName; + Catalog defaultCatalog; + if (environment.getDefaultCatalog().isPresent()) { + defaultCatalogName = environment.getDefaultCatalog().get(); + defaultCatalog = environment.getRegisteredCatalogs().get(defaultCatalogName); + } else { + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(configuration).build(); + defaultCatalogName = settings.getBuiltInCatalogName(); + defaultCatalog = + new GenericInMemoryCatalog( + defaultCatalogName, settings.getBuiltInDatabaseName()); + } + defaultCatalog.open(); + + return builder.defaultCatalog(defaultCatalogName, defaultCatalog) + .currentDatabase(environment.getDefaultDatabase().orElse(null)) Review Comment: I removed 'CatalogManager.Builder#currentDatabase'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org