This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd47530f1ee [FLINK-32309][sql-gateway] Use independent resource 
manager for table environment (#22768)
dd47530f1ee is described below

commit dd47530f1eee829242c1c7a5fab9b4b8553cba5a
Author: Fang Yong <zjur...@gmail.com>
AuthorDate: Wed Nov 1 19:00:40 2023 +0800

    [FLINK-32309][sql-gateway] Use independent resource manager for table 
environment (#22768)
---
 .../src/test/resources/sql/function.q              | 31 +++++++++----
 .../service/operation/OperationExecutor.java       | 54 ++++++++++++++++++++--
 .../gateway/service/result/ResultFetcher.java      | 15 ++++++
 .../flink/table/catalog/FunctionCatalog.java       | 37 +++++++++++++--
 .../flink/table/resource/ResourceManager.java      | 52 ++++++++++++++++-----
 .../flink/table/resource/ResourceManagerTest.java  | 19 ++++++++
 6 files changed, 179 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q 
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index 320fbefc898..e3e21ed8ccd 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -306,6 +306,11 @@ create function upperudf AS 'UpperUDF' using jar 
'$VAR_UDF_JAR_PATH';
 [INFO] Execute statement succeed.
 !info
 
+# `SHOW JARS` does not list the jars being used by function, it only list all 
the jars added by `ADD JAR`
+SHOW JARS;
+Empty set
+!ok
+
 # run a query to verify the registered UDF works
 SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), (2, 'hi')) as T(id, 
str);
 +----+-------------+--------------------------------+
@@ -317,19 +322,27 @@ SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), 
(2, 'hi')) as T(id, st
 Received a total of 2 rows
 !ok
 
+# Each query registers its jar to resource manager could not affect the 
session in sql gateway
 SHOW JARS;
-+-$VAR_UDF_JAR_PATH_DASH-----+
-| $VAR_UDF_JAR_PATH_SPACEjars |
-+-$VAR_UDF_JAR_PATH_DASH-----+
-| $VAR_UDF_JAR_PATH |
-+-$VAR_UDF_JAR_PATH_DASH-----+
-1 row in set
+Empty set
 !ok
 
-REMOVE JAR '$VAR_UDF_JAR_PATH';
-[INFO] Execute statement succeed.
-!info
+# Show all users functions which should not add function jars to session 
resource manager
+show user functions;
++---------------+
+| function name |
++---------------+
+|        func11 |
+|         func3 |
+|         func4 |
+| temp_upperudf |
+|      tmp_func |
+|      upperudf |
++---------------+
+6 rows in set
+!ok
 
+# Show functions will not affect the session in sql gateway
 SHOW JARS;
 Empty set
 !ok
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 96a9003f816..1d29e555a74 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.ExecutableOperationContextImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
@@ -70,10 +71,12 @@ import 
org.apache.flink.table.operations.CallProcedureOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ExecutableOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseOperation;
@@ -82,10 +85,13 @@ import 
org.apache.flink.table.operations.command.ExecutePlanOperation;
 import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.command.ShowJobsOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AlterOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateOperation;
+import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropOperation;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.DateTimeUtils;
@@ -110,6 +116,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static 
org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
 import static 
org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES;
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
@@ -183,7 +191,8 @@ public class OperationExecutor {
 
     public ResultFetcher executeStatement(OperationHandle handle, String 
statement) {
         // Instantiate the TableEnvironment lazily
-        TableEnvironmentInternal tableEnv = getTableEnvironment();
+        ResourceManager resourceManager = 
sessionContext.getSessionState().resourceManager.copy();
+        TableEnvironmentInternal tableEnv = 
getTableEnvironment(resourceManager);
         List<Operation> parsedOperations = 
tableEnv.getParser().parse(statement);
         if (parsedOperations.size() > 1) {
             throw new UnsupportedOperationException(
@@ -197,14 +206,15 @@ public class OperationExecutor {
             try {
                 SqlGatewayStreamExecutionEnvironment.setAsContext(
                         sessionContext.getUserClassloader());
-                return executeOperation(tableEnv, handle, op);
+                return executeOperation(tableEnv, handle, 
op).withResourceManager(resourceManager);
             } finally {
                 SqlGatewayStreamExecutionEnvironment.unsetAsContext();
             }
         } else {
             return sessionContext.isStatementSetState()
                     ? executeOperationInStatementSetState(tableEnv, handle, op)
-                    : executeOperation(tableEnv, handle, op);
+                            .withResourceManager(resourceManager)
+                    : executeOperation(tableEnv, handle, 
op).withResourceManager(resourceManager);
         }
     }
 
@@ -315,6 +325,10 @@ public class OperationExecutor {
     // 
--------------------------------------------------------------------------------------------
 
     public TableEnvironmentInternal getTableEnvironment() {
+        return 
getTableEnvironment(sessionContext.getSessionState().resourceManager);
+    }
+
+    public TableEnvironmentInternal getTableEnvironment(ResourceManager 
resourceManager) {
         // checks the value of RUNTIME_MODE
         Configuration operationConfig = 
sessionContext.getSessionConf().clone();
         operationConfig.addAll(executionConfig);
@@ -342,8 +356,8 @@ public class OperationExecutor {
                 executor,
                 sessionContext.getSessionState().catalogManager,
                 sessionContext.getSessionState().moduleManager,
-                sessionContext.getSessionState().resourceManager,
-                sessionContext.getSessionState().functionCatalog);
+                resourceManager,
+                
sessionContext.getSessionState().functionCatalog.copy(resourceManager));
     }
 
     private static Executor lookupExecutor(
@@ -440,11 +454,41 @@ public class OperationExecutor {
             return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) 
op);
         } else if (op instanceof RemoveJarOperation) {
             return callRemoveJar(handle, ((RemoveJarOperation) op).getPath());
+        } else if (op instanceof AddJarOperation
+                || op instanceof ShowJarsOperation
+                || op instanceof CreateTempSystemFunctionOperation
+                || op instanceof CreateCatalogFunctionOperation
+                || op instanceof ShowFunctionsOperation) {
+            return callExecutableOperation(handle, (ExecutableOperation) op);
         } else {
             return callOperation(tableEnv, handle, op);
         }
     }
 
+    private ResultFetcher callExecutableOperation(OperationHandle handle, 
ExecutableOperation op) {
+        TableResultInternal result =
+                op.execute(
+                        new ExecutableOperationContextImpl(
+                                
sessionContext.getSessionState().catalogManager,
+                                
sessionContext.getSessionState().functionCatalog,
+                                sessionContext.getSessionState().moduleManager,
+                                
sessionContext.getSessionState().resourceManager,
+                                tableConfig(),
+                                
sessionContext.getSessionConf().get(RUNTIME_MODE) == STREAMING));
+        return ResultFetcher.fromTableResult(handle, result, false);
+    }
+
+    private TableConfig tableConfig() {
+        Configuration operationConfig = 
sessionContext.getSessionConf().clone();
+        operationConfig.addAll(executionConfig);
+
+        TableConfig tableConfig = TableConfig.getDefault();
+        
tableConfig.setRootConfiguration(sessionContext.getDefaultContext().getFlinkConfig());
+        tableConfig.addConfiguration(operationConfig);
+
+        return tableConfig;
+    }
+
     private ResultFetcher callSetOperation(
             TableEnvironmentInternal tableEnv, OperationHandle handle, 
SetOperation setOp) {
         if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index 9fa810c9c8c..f901091f18e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.ResultSetImpl;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -78,6 +80,7 @@ public class ResultFetcher {
 
     private long currentToken = 0;
     private boolean noMoreResults = false;
+    @Nullable private ResourceManager resourceManager;
 
     private ResultFetcher(
             OperationHandle operationHandle,
@@ -181,8 +184,20 @@ public class ResultFetcher {
         return new ResultFetcher(operationHandle, resultSchema, results, 
jobID, resultKind);
     }
 
+    public ResultFetcher withResourceManager(ResourceManager resourceManager) {
+        this.resourceManager = resourceManager;
+        return this;
+    }
+
     public void close() {
         resultStore.close();
+        if (resourceManager != null) {
+            try {
+                resourceManager.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     public ResolvedSchema getResultSchema() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 8eb8f82351f..55507ad1e6d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -78,9 +78,8 @@ public final class FunctionCatalog {
     private final CatalogManager catalogManager;
     private final ModuleManager moduleManager;
 
-    private final Map<String, CatalogFunction> tempSystemFunctions = new 
LinkedHashMap<>();
-    private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions =
-            new LinkedHashMap<>();
+    private final Map<String, CatalogFunction> tempSystemFunctions;
+    private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions;
 
     /**
      * Temporary utility until the new type inference is fully functional. It 
needs to be set by the
@@ -93,10 +92,31 @@ public final class FunctionCatalog {
             ResourceManager resourceManager,
             CatalogManager catalogManager,
             ModuleManager moduleManager) {
+        this(
+                config,
+                resourceManager,
+                catalogManager,
+                moduleManager,
+                new LinkedHashMap<>(),
+                new LinkedHashMap<>(),
+                null);
+    }
+
+    private FunctionCatalog(
+            ReadableConfig config,
+            ResourceManager resourceManager,
+            CatalogManager catalogManager,
+            ModuleManager moduleManager,
+            Map<String, CatalogFunction> tempSystemFunctions,
+            Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions,
+            PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
         this.config = checkNotNull(config);
         this.resourceManager = checkNotNull(resourceManager);
         this.catalogManager = checkNotNull(catalogManager);
         this.moduleManager = checkNotNull(moduleManager);
+        this.tempSystemFunctions = tempSystemFunctions;
+        this.tempCatalogFunctions = tempCatalogFunctions;
+        this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
     }
 
     public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil 
plannerTypeInferenceUtil) {
@@ -777,6 +797,17 @@ public final class FunctionCatalog {
         }
     }
 
+    public FunctionCatalog copy(ResourceManager newResourceManager) {
+        return new FunctionCatalog(
+                config,
+                newResourceManager,
+                catalogManager,
+                moduleManager,
+                tempSystemFunctions,
+                tempCatalogFunctions,
+                plannerTypeInferenceUtil);
+    }
+
     private void registerCatalogFunction(
             ObjectIdentifier identifier, CatalogFunction catalogFunction, 
boolean ignoreIfExists) {
         final ObjectIdentifier normalizedIdentifier =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index 5de2b5d6ef4..307380af41b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -69,10 +69,12 @@ public class ResourceManager implements Closeable {
     private static final String JAR_SUFFIX = "jar";
     private static final String FILE_SCHEME = "file";
 
-    private final Path localResourceDir;
+    protected final Path localResourceDir;
     /** Resource infos for functions. */
     private final Map<ResourceUri, ResourceCounter> functionResourceInfos;
 
+    private final boolean cleanLocalResource;
+
     protected final Map<ResourceUri, URL> resourceInfos;
     protected final MutableURLClassLoader userClassLoader;
 
@@ -84,13 +86,27 @@ public class ResourceManager implements Closeable {
     }
 
     public ResourceManager(ReadableConfig config, MutableURLClassLoader 
userClassLoader) {
-        this.localResourceDir =
+        this(
                 new Path(
                         config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR),
-                        String.format("flink-table-%s", UUID.randomUUID()));
-        this.functionResourceInfos = new HashMap<>();
-        this.resourceInfos = new HashMap<>();
+                        String.format("flink-table-%s", UUID.randomUUID())),
+                new HashMap<>(),
+                new HashMap<>(),
+                userClassLoader,
+                true);
+    }
+
+    private ResourceManager(
+            Path localResourceDir,
+            Map<ResourceUri, URL> resourceInfos,
+            Map<ResourceUri, ResourceCounter> functionResourceInfos,
+            MutableURLClassLoader userClassLoader,
+            boolean cleanLocalResource) {
+        this.localResourceDir = localResourceDir;
+        this.functionResourceInfos = functionResourceInfos;
+        this.resourceInfos = resourceInfos;
         this.userClassLoader = userClassLoader;
+        this.cleanLocalResource = cleanLocalResource;
     }
 
     /**
@@ -232,6 +248,15 @@ public class ResourceManager implements Closeable {
         tableConfig.set(PipelineOptions.JARS, new ArrayList<>(jarFiles));
     }
 
+    public ResourceManager copy() {
+        return new ResourceManager(
+                localResourceDir,
+                new HashMap<>(resourceInfos),
+                new HashMap<>(functionResourceInfos),
+                userClassLoader.copy(),
+                false);
+    }
+
     @Override
     public void close() throws IOException {
         resourceInfos.clear();
@@ -245,14 +270,17 @@ public class ResourceManager implements Closeable {
             exception = e;
         }
 
-        FileSystem fileSystem = FileSystem.getLocalFileSystem();
-        try {
-            if (fileSystem.exists(localResourceDir)) {
-                fileSystem.delete(localResourceDir, true);
+        if (cleanLocalResource) {
+            FileSystem fileSystem = FileSystem.getLocalFileSystem();
+            try {
+                if (fileSystem.exists(localResourceDir)) {
+                    fileSystem.delete(localResourceDir, true);
+                }
+            } catch (IOException ioe) {
+                LOG.debug(
+                        String.format("Error while delete directory [%s].", 
localResourceDir), ioe);
+                exception = ExceptionUtils.firstOrSuppressed(ioe, exception);
             }
-        } catch (IOException ioe) {
-            LOG.debug(String.format("Error while delete directory [%s].", 
localResourceDir), ioe);
-            exception = ExceptionUtils.firstOrSuppressed(ioe, exception);
         }
 
         if (exception != null) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index 42305d806af..400e865b416 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -431,6 +431,25 @@ public class ResourceManagerTest {
         assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse();
     }
 
+    @Test
+    void testCloseCopiedResourceManager() throws Exception {
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
+        
resourceManager.declareFunctionResources(Collections.singleton(resourceUri));
+        
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
+        
assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1);
+        assertThat(resourceManager.resourceInfos.size()).isEqualTo(1);
+
+        ResourceManager copiedResourceManager = resourceManager.copy();
+        
assertThat(copiedResourceManager.functionResourceInfos().size()).isEqualTo(1);
+        assertThat(copiedResourceManager.resourceInfos.size()).isEqualTo(1);
+        copiedResourceManager.close();
+        
assertThat(copiedResourceManager.functionResourceInfos().size()).isEqualTo(0);
+        assertThat(copiedResourceManager.resourceInfos.size()).isEqualTo(0);
+
+        
assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1);
+        assertThat(resourceManager.resourceInfos.size()).isEqualTo(1);
+    }
+
     @Test
     public void testCloseResourceManagerCleanDownloadedResources() throws 
Exception {
         resourceManager.close();

Reply via email to