This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ExecutableOperation
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d95f69bb459b5efb44c7cd468485d85cd08aef50
Author: Jark Wu <j...@apache.org>
AuthorDate: Tue Mar 14 17:54:48 2023 +0800

    [FLINK-31450][table] Introduce ExecutableOperation for operations to execute
    
    This closes #22175
---
 .../internal/ExecutableOperationContextImpl.java   | 46 +++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  9 ++++
 .../table/operations/ExecutableOperation.java      | 58 ++++++++++++++++++++++
 3 files changed, 113 insertions(+)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java
new file mode 100644
index 00000000000..6cc41d7843c
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ExecutableOperationContextImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.ExecutableOperation;
+
+/** A simple implementation of {@link ExecutableOperation.Context}. */
+public class ExecutableOperationContextImpl implements 
ExecutableOperation.Context {
+
+    private final CatalogManager catalogManager;
+    private final ModuleManager moduleManager;
+
+    public ExecutableOperationContextImpl(
+            CatalogManager catalogManager, ModuleManager moduleManager) {
+        this.catalogManager = catalogManager;
+        this.moduleManager = moduleManager;
+    }
+
+    @Override
+    public CatalogManager getCatalogManager() {
+        return catalogManager;
+    }
+
+    @Override
+    public ModuleManager getModuleManager() {
+        return moduleManager;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index b43123a0072..90c1cd24701 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -91,6 +91,7 @@ import 
org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
+import org.apache.flink.table.operations.ExecutableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
@@ -203,6 +204,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     protected final FunctionCatalog functionCatalog;
     protected final Planner planner;
     private final boolean isStreamingMode;
+    private final ExecutableOperation.Context operationCtx;
 
     private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
             "Unsupported SQL query! executeSql() only accepts a single SQL 
statement of type "
@@ -262,6 +264,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                         isStreamingMode);
         catalogManager.initSchemaResolver(
                 isStreamingMode, operationTreeBuilder.getResolverBuilder());
+        this.operationCtx = new ExecutableOperationContextImpl(catalogManager, 
moduleManager);
     }
 
     public static TableEnvironmentImpl create(Configuration configuration) {
@@ -984,6 +987,12 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         if (tableResult.isPresent()) {
             return tableResult.get();
         }
+
+        // delegate execution to Operation if it implements ExecutableOperation
+        if (operation instanceof ExecutableOperation) {
+            return ((ExecutableOperation) operation).execute(operationCtx);
+        }
+
         // otherwise, fall back to internal implementation
         if (operation instanceof ModifyOperation) {
             return executeInternal(Collections.singletonList((ModifyOperation) 
operation));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java
new file mode 100644
index 00000000000..ae542de7ad4
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExecutableOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+
+/**
+ * An {@link ExecutableOperation} represents an operation that is executed for 
its side effects.
+ *
+ * <p>This internal interface is proposed to reduce the maintenance of bloated 
{@link
+ * TableEnvironmentImpl#executeInternal(Operation)} and improve code 
readability as execution logic
+ * is co-located with the operation definition. Besides, this interface can be 
used to extend
+ * user-defined operation with customized execution logic once this interface 
is stable and public
+ * in the future.
+ */
+@Internal
+public interface ExecutableOperation extends Operation {
+
+    /**
+     * Executes the given operation and return the execution result.
+     *
+     * @param ctx the context to execute the operation.
+     * @return the content of the execution result.
+     * @see 
org.apache.flink.table.api.internal.TableEnvironmentInternal#executeInternal(Operation)
+     */
+    TableResultInternal execute(Context ctx);
+
+    /**
+     * The context to execute the operation. Operation may make side effect to 
the context, e.g.
+     * catalog manager, configuration.
+     */
+    interface Context {
+
+        CatalogManager getCatalogManager();
+
+        ModuleManager getModuleManager();
+    }
+}

Reply via email to