This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dd47530f1ee [FLINK-32309][sql-gateway] Use independent resource manager for table environment (#22768) dd47530f1ee is described below commit dd47530f1eee829242c1c7a5fab9b4b8553cba5a Author: Fang Yong <zjur...@gmail.com> AuthorDate: Wed Nov 1 19:00:40 2023 +0800 [FLINK-32309][sql-gateway] Use independent resource manager for table environment (#22768) --- .../src/test/resources/sql/function.q | 31 +++++++++---- .../service/operation/OperationExecutor.java | 54 ++++++++++++++++++++-- .../gateway/service/result/ResultFetcher.java | 15 ++++++ .../flink/table/catalog/FunctionCatalog.java | 37 +++++++++++++-- .../flink/table/resource/ResourceManager.java | 52 ++++++++++++++++----- .../flink/table/resource/ResourceManagerTest.java | 19 ++++++++ 6 files changed, 179 insertions(+), 29 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q index 320fbefc898..e3e21ed8ccd 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/function.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q @@ -306,6 +306,11 @@ create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH'; [INFO] Execute statement succeed. !info +# `SHOW JARS` does not list the jars being used by function, it only list all the jars added by `ADD JAR` +SHOW JARS; +Empty set +!ok + # run a query to verify the registered UDF works SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), (2, 'hi')) as T(id, str); +----+-------------+--------------------------------+ @@ -317,19 +322,27 @@ SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), (2, 'hi')) as T(id, st Received a total of 2 rows !ok +# Each query registers its jar to resource manager could not affect the session in sql gateway SHOW JARS; -+-$VAR_UDF_JAR_PATH_DASH-----+ -| $VAR_UDF_JAR_PATH_SPACEjars | -+-$VAR_UDF_JAR_PATH_DASH-----+ -| $VAR_UDF_JAR_PATH | -+-$VAR_UDF_JAR_PATH_DASH-----+ -1 row in set +Empty set !ok -REMOVE JAR '$VAR_UDF_JAR_PATH'; -[INFO] Execute statement succeed. -!info +# Show all users functions which should not add function jars to session resource manager +show user functions; ++---------------+ +| function name | ++---------------+ +| func11 | +| func3 | +| func4 | +| temp_upperudf | +| tmp_func | +| upperudf | ++---------------+ +6 rows in set +!ok +# Show functions will not affect the session in sql gateway SHOW JARS; Empty set !ok diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 96a9003f816..1d29e555a74 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -37,6 +37,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.internal.ExecutableOperationContextImpl; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; @@ -70,10 +71,12 @@ import org.apache.flink.table.operations.CallProcedureOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.EndStatementSetOperation; +import org.apache.flink.table.operations.ExecutableOperation; import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.StatementSetOperation; import org.apache.flink.table.operations.UnloadModuleOperation; import org.apache.flink.table.operations.UseOperation; @@ -82,10 +85,13 @@ import org.apache.flink.table.operations.command.ExecutePlanOperation; import org.apache.flink.table.operations.command.RemoveJarOperation; import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.command.ShowJarsOperation; import org.apache.flink.table.operations.command.ShowJobsOperation; import org.apache.flink.table.operations.command.StopJobOperation; import org.apache.flink.table.operations.ddl.AlterOperation; +import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.CreateOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropOperation; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.utils.DateTimeUtils; @@ -110,6 +116,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK; import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES; import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID; @@ -183,7 +191,8 @@ public class OperationExecutor { public ResultFetcher executeStatement(OperationHandle handle, String statement) { // Instantiate the TableEnvironment lazily - TableEnvironmentInternal tableEnv = getTableEnvironment(); + ResourceManager resourceManager = sessionContext.getSessionState().resourceManager.copy(); + TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager); List<Operation> parsedOperations = tableEnv.getParser().parse(statement); if (parsedOperations.size() > 1) { throw new UnsupportedOperationException( @@ -197,14 +206,15 @@ public class OperationExecutor { try { SqlGatewayStreamExecutionEnvironment.setAsContext( sessionContext.getUserClassloader()); - return executeOperation(tableEnv, handle, op); + return executeOperation(tableEnv, handle, op).withResourceManager(resourceManager); } finally { SqlGatewayStreamExecutionEnvironment.unsetAsContext(); } } else { return sessionContext.isStatementSetState() ? executeOperationInStatementSetState(tableEnv, handle, op) - : executeOperation(tableEnv, handle, op); + .withResourceManager(resourceManager) + : executeOperation(tableEnv, handle, op).withResourceManager(resourceManager); } } @@ -315,6 +325,10 @@ public class OperationExecutor { // -------------------------------------------------------------------------------------------- public TableEnvironmentInternal getTableEnvironment() { + return getTableEnvironment(sessionContext.getSessionState().resourceManager); + } + + public TableEnvironmentInternal getTableEnvironment(ResourceManager resourceManager) { // checks the value of RUNTIME_MODE Configuration operationConfig = sessionContext.getSessionConf().clone(); operationConfig.addAll(executionConfig); @@ -342,8 +356,8 @@ public class OperationExecutor { executor, sessionContext.getSessionState().catalogManager, sessionContext.getSessionState().moduleManager, - sessionContext.getSessionState().resourceManager, - sessionContext.getSessionState().functionCatalog); + resourceManager, + sessionContext.getSessionState().functionCatalog.copy(resourceManager)); } private static Executor lookupExecutor( @@ -440,11 +454,41 @@ public class OperationExecutor { return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) op); } else if (op instanceof RemoveJarOperation) { return callRemoveJar(handle, ((RemoveJarOperation) op).getPath()); + } else if (op instanceof AddJarOperation + || op instanceof ShowJarsOperation + || op instanceof CreateTempSystemFunctionOperation + || op instanceof CreateCatalogFunctionOperation + || op instanceof ShowFunctionsOperation) { + return callExecutableOperation(handle, (ExecutableOperation) op); } else { return callOperation(tableEnv, handle, op); } } + private ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) { + TableResultInternal result = + op.execute( + new ExecutableOperationContextImpl( + sessionContext.getSessionState().catalogManager, + sessionContext.getSessionState().functionCatalog, + sessionContext.getSessionState().moduleManager, + sessionContext.getSessionState().resourceManager, + tableConfig(), + sessionContext.getSessionConf().get(RUNTIME_MODE) == STREAMING)); + return ResultFetcher.fromTableResult(handle, result, false); + } + + private TableConfig tableConfig() { + Configuration operationConfig = sessionContext.getSessionConf().clone(); + operationConfig.addAll(executionConfig); + + TableConfig tableConfig = TableConfig.getDefault(); + tableConfig.setRootConfiguration(sessionContext.getDefaultContext().getFlinkConfig()); + tableConfig.addConfiguration(operationConfig); + + return tableConfig; + } + private ResultFetcher callSetOperation( TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) { if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java index 9fa810c9c8c..f901091f18e 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java @@ -30,6 +30,7 @@ import org.apache.flink.table.gateway.api.results.FetchOrientation; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.results.ResultSetImpl; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.utils.print.RowDataToStringConverter; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -78,6 +80,7 @@ public class ResultFetcher { private long currentToken = 0; private boolean noMoreResults = false; + @Nullable private ResourceManager resourceManager; private ResultFetcher( OperationHandle operationHandle, @@ -181,8 +184,20 @@ public class ResultFetcher { return new ResultFetcher(operationHandle, resultSchema, results, jobID, resultKind); } + public ResultFetcher withResourceManager(ResourceManager resourceManager) { + this.resourceManager = resourceManager; + return this; + } + public void close() { resultStore.close(); + if (resourceManager != null) { + try { + resourceManager.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } public ResolvedSchema getResultSchema() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 8eb8f82351f..55507ad1e6d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -78,9 +78,8 @@ public final class FunctionCatalog { private final CatalogManager catalogManager; private final ModuleManager moduleManager; - private final Map<String, CatalogFunction> tempSystemFunctions = new LinkedHashMap<>(); - private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions = - new LinkedHashMap<>(); + private final Map<String, CatalogFunction> tempSystemFunctions; + private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions; /** * Temporary utility until the new type inference is fully functional. It needs to be set by the @@ -93,10 +92,31 @@ public final class FunctionCatalog { ResourceManager resourceManager, CatalogManager catalogManager, ModuleManager moduleManager) { + this( + config, + resourceManager, + catalogManager, + moduleManager, + new LinkedHashMap<>(), + new LinkedHashMap<>(), + null); + } + + private FunctionCatalog( + ReadableConfig config, + ResourceManager resourceManager, + CatalogManager catalogManager, + ModuleManager moduleManager, + Map<String, CatalogFunction> tempSystemFunctions, + Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions, + PlannerTypeInferenceUtil plannerTypeInferenceUtil) { this.config = checkNotNull(config); this.resourceManager = checkNotNull(resourceManager); this.catalogManager = checkNotNull(catalogManager); this.moduleManager = checkNotNull(moduleManager); + this.tempSystemFunctions = tempSystemFunctions; + this.tempCatalogFunctions = tempCatalogFunctions; + this.plannerTypeInferenceUtil = plannerTypeInferenceUtil; } public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) { @@ -777,6 +797,17 @@ public final class FunctionCatalog { } } + public FunctionCatalog copy(ResourceManager newResourceManager) { + return new FunctionCatalog( + config, + newResourceManager, + catalogManager, + moduleManager, + tempSystemFunctions, + tempCatalogFunctions, + plannerTypeInferenceUtil); + } + private void registerCatalogFunction( ObjectIdentifier identifier, CatalogFunction catalogFunction, boolean ignoreIfExists) { final ObjectIdentifier normalizedIdentifier = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java index 5de2b5d6ef4..307380af41b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java @@ -69,10 +69,12 @@ public class ResourceManager implements Closeable { private static final String JAR_SUFFIX = "jar"; private static final String FILE_SCHEME = "file"; - private final Path localResourceDir; + protected final Path localResourceDir; /** Resource infos for functions. */ private final Map<ResourceUri, ResourceCounter> functionResourceInfos; + private final boolean cleanLocalResource; + protected final Map<ResourceUri, URL> resourceInfos; protected final MutableURLClassLoader userClassLoader; @@ -84,13 +86,27 @@ public class ResourceManager implements Closeable { } public ResourceManager(ReadableConfig config, MutableURLClassLoader userClassLoader) { - this.localResourceDir = + this( new Path( config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR), - String.format("flink-table-%s", UUID.randomUUID())); - this.functionResourceInfos = new HashMap<>(); - this.resourceInfos = new HashMap<>(); + String.format("flink-table-%s", UUID.randomUUID())), + new HashMap<>(), + new HashMap<>(), + userClassLoader, + true); + } + + private ResourceManager( + Path localResourceDir, + Map<ResourceUri, URL> resourceInfos, + Map<ResourceUri, ResourceCounter> functionResourceInfos, + MutableURLClassLoader userClassLoader, + boolean cleanLocalResource) { + this.localResourceDir = localResourceDir; + this.functionResourceInfos = functionResourceInfos; + this.resourceInfos = resourceInfos; this.userClassLoader = userClassLoader; + this.cleanLocalResource = cleanLocalResource; } /** @@ -232,6 +248,15 @@ public class ResourceManager implements Closeable { tableConfig.set(PipelineOptions.JARS, new ArrayList<>(jarFiles)); } + public ResourceManager copy() { + return new ResourceManager( + localResourceDir, + new HashMap<>(resourceInfos), + new HashMap<>(functionResourceInfos), + userClassLoader.copy(), + false); + } + @Override public void close() throws IOException { resourceInfos.clear(); @@ -245,14 +270,17 @@ public class ResourceManager implements Closeable { exception = e; } - FileSystem fileSystem = FileSystem.getLocalFileSystem(); - try { - if (fileSystem.exists(localResourceDir)) { - fileSystem.delete(localResourceDir, true); + if (cleanLocalResource) { + FileSystem fileSystem = FileSystem.getLocalFileSystem(); + try { + if (fileSystem.exists(localResourceDir)) { + fileSystem.delete(localResourceDir, true); + } + } catch (IOException ioe) { + LOG.debug( + String.format("Error while delete directory [%s].", localResourceDir), ioe); + exception = ExceptionUtils.firstOrSuppressed(ioe, exception); } - } catch (IOException ioe) { - LOG.debug(String.format("Error while delete directory [%s].", localResourceDir), ioe); - exception = ExceptionUtils.firstOrSuppressed(ioe, exception); } if (exception != null) { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java index 42305d806af..400e865b416 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java @@ -431,6 +431,25 @@ public class ResourceManagerTest { assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse(); } + @Test + void testCloseCopiedResourceManager() throws Exception { + ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, udfJar.getPath()); + resourceManager.declareFunctionResources(Collections.singleton(resourceUri)); + resourceManager.registerJarResources(Collections.singletonList(resourceUri)); + assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1); + assertThat(resourceManager.resourceInfos.size()).isEqualTo(1); + + ResourceManager copiedResourceManager = resourceManager.copy(); + assertThat(copiedResourceManager.functionResourceInfos().size()).isEqualTo(1); + assertThat(copiedResourceManager.resourceInfos.size()).isEqualTo(1); + copiedResourceManager.close(); + assertThat(copiedResourceManager.functionResourceInfos().size()).isEqualTo(0); + assertThat(copiedResourceManager.resourceInfos.size()).isEqualTo(0); + + assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1); + assertThat(resourceManager.resourceInfos.size()).isEqualTo(1); + } + @Test public void testCloseResourceManagerCleanDownloadedResources() throws Exception { resourceManager.close();