This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd1d97391be408885dc8df35c8a83770fe7e0c38
Author: Shengkai <1059623...@qq.com>
AuthorDate: Tue Sep 6 17:12:17 2022 +0800

    [FLINK-29184][sql-gateway] Close resource manager when closing Session
---
 .../table/client/gateway/context/SessionContext.java   |  6 ++++++
 .../table/gateway/service/context/SessionContext.java  | 18 ++++++++++++++++--
 .../gateway/service/context/SessionContextTest.java    |  8 +++++++-
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index 8260b03e0b0..7e44b65dad6 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Map;
 import java.util.Set;
@@ -168,6 +169,11 @@ public class SessionContext {
             }
         }
         classLoader.close();
+        try {
+            sessionState.resourceManager.close();
+        } catch (IOException e) {
+            LOG.error("Failed to close the resource manager.", e);
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 35622c9b917..94cc9d13b32 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -171,13 +171,27 @@ public class SessionContext {
             try {
                 
sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
             } catch (Throwable t) {
-                LOG.error("Failed to close catalog %s.", t);
+                LOG.error(
+                        String.format(
+                                "Failed to close catalog %s for the session 
%s.", name, sessionId),
+                        t);
             }
         }
         try {
             userClassloader.close();
         } catch (IOException e) {
-            LOG.debug("Error while closing class loader.", e);
+            LOG.error(
+                    String.format(
+                            "Error while closing class loader for the session 
%s.", sessionId),
+                    e);
+        }
+        try {
+            sessionState.resourceManager.close();
+        } catch (IOException e) {
+            LOG.error(
+                    String.format(
+                            "Failed to close the resource manager for the 
session %s.", sessionId),
+                    e);
         }
     }
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index 855411d81b0..3786588dc58 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
 import org.apache.flink.table.gateway.api.utils.ThreadUtils;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -51,8 +52,13 @@ class SessionContextTest {
         sessionContext = createSessionContext();
     }
 
+    @AfterEach
+    public void cleanUp() {
+        sessionContext.close();
+    }
+
     @AfterAll
-    public static void cleanUp() {
+    public static void closeResources() {
         EXECUTOR_SERVICE.shutdown();
     }
 

Reply via email to