yuzelin commented on code in PR #20714:
URL: https://github.com/apache/flink/pull/20714#discussion_r962537416


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -235,40 +236,79 @@ public static SessionContext create(
 
         final ModuleManager moduleManager = new ModuleManager();
 
-        final EnvironmentSettings settings =
-                
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
-
-        CatalogManager catalogManager =
-                CatalogManager.newBuilder()
-                        // Currently, the classloader is only used by 
DataTypeFactory.
-                        .classLoader(userClassLoader)
-                        .config(configuration)
-                        .defaultCatalog(
-                                settings.getBuiltInCatalogName(),
-                                new GenericInMemoryCatalog(
-                                        settings.getBuiltInCatalogName(),
-                                        settings.getBuiltInDatabaseName()))
-                        .build();
+        final CatalogManager catalogManager =
+                buildCatalogManager(configuration, userClassLoader, 
environment);
 
         final FunctionCatalog functionCatalog =
                 new FunctionCatalog(configuration, resourceManager, 
catalogManager, moduleManager);
         SessionState sessionState =
                 new SessionState(catalogManager, moduleManager, 
resourceManager, functionCatalog);
 
-        return new SessionContext(
-                defaultContext,
-                sessionId,
-                endpointVersion,
-                configuration,
-                userClassLoader,
-                sessionState,
-                new OperationManager(operationExecutorService));
+        // 
--------------------------------------------------------------------------------------------------------------
+        // Build session context and return
+        // 
--------------------------------------------------------------------------------------------------------------
+
+        SessionContext sessionContext =
+                new SessionContext(
+                        defaultContext,
+                        sessionId,
+                        environment.getSessionEndpointVersion(),
+                        configuration,
+                        userClassLoader,
+                        sessionState,
+                        new OperationManager(operationExecutorService));
+
+        // filter the default catalog out to avoid exception caused by 
repeated registration
+        String currentCatalog = 
sessionContext.sessionState.catalogManager.getCurrentCatalog();
+        environment
+                .getRegisteredCatalogs()
+                .forEach(
+                        (catalogName, catalog) -> {
+                            if (!catalogName.equals(currentCatalog)) {
+                                sessionContext.registerCatalog(catalogName, 
catalog);
+                            }
+                        });
+
+        
environment.getRegisteredModules().forEach(sessionContext::registerModuleAtHead);
+
+        return sessionContext;
     }
 
     // 
------------------------------------------------------------------------------------------------------------------
     // Helpers
     // 
------------------------------------------------------------------------------------------------------------------
 
+    private static CatalogManager buildCatalogManager(
+            Configuration configuration,
+            URLClassLoader userClassLoader,
+            SessionEnvironment environment) {
+        CatalogManager.Builder builder =
+                CatalogManager.newBuilder()
+                        // Currently, the classloader is only used by 
DataTypeFactory.
+                        .classLoader(userClassLoader)
+                        .config(configuration);
+
+        // init default catalog
+        String defaultCatalogName;
+        Catalog defaultCatalog;
+        if (environment.getDefaultCatalog().isPresent()) {
+            defaultCatalogName = environment.getDefaultCatalog().get();
+            defaultCatalog = 
environment.getRegisteredCatalogs().get(defaultCatalogName);
+        } else {
+            EnvironmentSettings settings =
+                    
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+            defaultCatalogName = settings.getBuiltInCatalogName();
+            defaultCatalog =
+                    new GenericInMemoryCatalog(
+                            defaultCatalogName, 
settings.getBuiltInDatabaseName());
+        }
+        defaultCatalog.open();
+
+        return builder.defaultCatalog(defaultCatalogName, defaultCatalog)
+                .currentDatabase(environment.getDefaultDatabase().orElse(null))

Review Comment:
   I removed 'CatalogManager.Builder#currentDatabase'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to