This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd1d97391be408885dc8df35c8a83770fe7e0c38 Author: Shengkai <1059623...@qq.com> AuthorDate: Tue Sep 6 17:12:17 2022 +0800 [FLINK-29184][sql-gateway] Close resource manager when closing Session --- .../table/client/gateway/context/SessionContext.java | 6 ++++++ .../table/gateway/service/context/SessionContext.java | 18 ++++++++++++++++-- .../gateway/service/context/SessionContextTest.java | 8 +++++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java index 8260b03e0b0..7e44b65dad6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java @@ -39,6 +39,7 @@ import org.apache.flink.util.TemporaryClassLoaderContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.URL; import java.util.Map; import java.util.Set; @@ -168,6 +169,11 @@ public class SessionContext { } } classLoader.close(); + try { + sessionState.resourceManager.close(); + } catch (IOException e) { + LOG.error("Failed to close the resource manager.", e); + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 35622c9b917..94cc9d13b32 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -171,13 +171,27 @@ public class SessionContext { try { sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close); } catch (Throwable t) { - LOG.error("Failed to close catalog %s.", t); + LOG.error( + String.format( + "Failed to close catalog %s for the session %s.", name, sessionId), + t); } } try { userClassloader.close(); } catch (IOException e) { - LOG.debug("Error while closing class loader.", e); + LOG.error( + String.format( + "Error while closing class loader for the session %s.", sessionId), + e); + } + try { + sessionState.resourceManager.close(); + } catch (IOException e) { + LOG.error( + String.format( + "Failed to close the resource manager for the session %s.", sessionId), + e); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java index 855411d81b0..3786588dc58 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java @@ -27,6 +27,7 @@ import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; import org.apache.flink.table.gateway.api.utils.ThreadUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,8 +52,13 @@ class SessionContextTest { sessionContext = createSessionContext(); } + @AfterEach + public void cleanUp() { + sessionContext.close(); + } + @AfterAll - public static void cleanUp() { + public static void closeResources() { EXECUTOR_SERVICE.shutdown(); }