This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ExecutableOperation in repository https://gitbox.apache.org/repos/asf/flink.git
commit d95f69bb459b5efb44c7cd468485d85cd08aef50 Author: Jark Wu <j...@apache.org> AuthorDate: Tue Mar 14 17:54:48 2023 +0800 [FLINK-31450][table] Introduce ExecutableOperation for operations to execute This closes #22175 --- .../internal/ExecutableOperationContextImpl.java | 46 +++++++++++++++++ .../table/api/internal/TableEnvironmentImpl.java | 9 ++++ .../table/operations/ExecutableOperation.java | 58 ++++++++++++++++++++++ 3 files changed, 113 insertions(+) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java new file mode 100644 index 00000000000..6cc41d7843c --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.internal; + +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.ExecutableOperation; + +/** A simple implementation of {@link ExecutableOperation.Context}. */ +public class ExecutableOperationContextImpl implements ExecutableOperation.Context { + + private final CatalogManager catalogManager; + private final ModuleManager moduleManager; + + public ExecutableOperationContextImpl( + CatalogManager catalogManager, ModuleManager moduleManager) { + this.catalogManager = catalogManager; + this.moduleManager = moduleManager; + } + + @Override + public CatalogManager getCatalogManager() { + return catalogManager; + } + + @Override + public ModuleManager getModuleManager() { + return moduleManager; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index b43123a0072..90c1cd24701 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -91,6 +91,7 @@ import org.apache.flink.table.operations.CompileAndExecutePlanOperation; import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.DescribeTableOperation; +import org.apache.flink.table.operations.ExecutableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.ModifyOperation; @@ -203,6 +204,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { protected final FunctionCatalog functionCatalog; protected final Planner planner; private final boolean isStreamingMode; + private final ExecutableOperation.Context operationCtx; private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG = "Unsupported SQL query! executeSql() only accepts a single SQL statement of type " @@ -262,6 +264,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { isStreamingMode); catalogManager.initSchemaResolver( isStreamingMode, operationTreeBuilder.getResolverBuilder()); + this.operationCtx = new ExecutableOperationContextImpl(catalogManager, moduleManager); } public static TableEnvironmentImpl create(Configuration configuration) { @@ -984,6 +987,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { if (tableResult.isPresent()) { return tableResult.get(); } + + // delegate execution to Operation if it implements ExecutableOperation + if (operation instanceof ExecutableOperation) { + return ((ExecutableOperation) operation).execute(operationCtx); + } + // otherwise, fall back to internal implementation if (operation instanceof ModifyOperation) { return executeInternal(Collections.singletonList((ModifyOperation) operation)); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java new file mode 100644 index 00000000000..ae542de7ad4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.module.ModuleManager; + +/** + * An {@link ExecutableOperation} represents an operation that is executed for its side effects. + * + * <p>This internal interface is proposed to reduce the maintenance of bloated {@link + * TableEnvironmentImpl#executeInternal(Operation)} and improve code readability as execution logic + * is co-located with the operation definition. Besides, this interface can be used to extend + * user-defined operation with customized execution logic once this interface is stable and public + * in the future. + */ +@Internal +public interface ExecutableOperation extends Operation { + + /** + * Executes the given operation and return the execution result. + * + * @param ctx the context to execute the operation. + * @return the content of the execution result. + * @see org.apache.flink.table.api.internal.TableEnvironmentInternal#executeInternal(Operation) + */ + TableResultInternal execute(Context ctx); + + /** + * The context to execute the operation. Operation may make side effect to the context, e.g. + * catalog manager, configuration. + */ + interface Context { + + CatalogManager getCatalogManager(); + + ModuleManager getModuleManager(); + } +}