twalthr commented on a change in pull request #18573:
URL: https://github.com/apache/flink/pull/18573#discussion_r800761109



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Pointer to a persisted plan. You can load the content of this reference 
into a {@link
+ * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you 
can directly execute
+ * it with {@link TableEnvironment#executePlan(PlanReference)}.
+ */
+@Experimental
+public final class PlanReference {
+
+    private final @Nullable File file;
+    private final @Nullable String content;
+
+    private PlanReference(@Nullable File file, @Nullable String content) {
+        this.file = file;
+        this.content = content;
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(String path) {
+        return fromFile(Paths.get(path).toFile());
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(Path path) {
+        return fromFile(path.toFile());
+    }
+
+    /** Create a reference starting from a file path. */
+    public static PlanReference fromFile(File file) {
+        return new PlanReference(file, null);
+    }
+
+    /** Create a reference starting from a JSON string. */
+    public static PlanReference fromJsonString(String jsonString) {
+        return new PlanReference(null, jsonString);
+    }
+
+    /**
+     * Create a reference from a file in the classpath, using {@code
+     * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(String classpathFilePath) {
+        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    }
+
+    /**
+     * Create a reference from a file in the classpath.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
+            throws TableException {
+        URL url = classLoader.getResource(classpathFilePath);

Review comment:
       Isn't this somehow already loading a file? It somehow does more a 
reference. Ideally, it would be great to have a symmetric operation. Write into 
resources and read from resources. This is what we need in our tests and I'm 
pretty sure our end-users would also like to have this functionality.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
##########
@@ -86,47 +89,17 @@
      */
     String explain(List<Operation> operations, ExplainDetail... extraDetails);
 
-    /**
-     * Get the json plan of the given {@link ModifyOperation}s.
-     *
-     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
-     * given statement. An ExecNode plan can be serialized to json plan, and a 
json plan can be
-     * deserialized to an ExecNode plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param modifyOperations the {@link ModifyOperation}s to generate json 
plan.
-     * @return the string json representation of an optimized ExecNode plan 
for the given
-     *     operations.
-     */
+    // FLIP-190 methods
+
     @Experimental
-    String getJsonPlan(List<ModifyOperation> modifyOperations);
+    CompiledPlan load(PlanReference planReference) throws IOException;

Review comment:
       too generic name: `loadPlan`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
##########
@@ -86,47 +89,17 @@
      */
     String explain(List<Operation> operations, ExplainDetail... extraDetails);
 
-    /**
-     * Get the json plan of the given {@link ModifyOperation}s.
-     *
-     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
-     * given statement. An ExecNode plan can be serialized to json plan, and a 
json plan can be
-     * deserialized to an ExecNode plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param modifyOperations the {@link ModifyOperation}s to generate json 
plan.
-     * @return the string json representation of an optimized ExecNode plan 
for the given
-     *     operations.
-     */
+    // FLIP-190 methods
+
     @Experimental
-    String getJsonPlan(List<ModifyOperation> modifyOperations);
+    CompiledPlan load(PlanReference planReference) throws IOException;
 
-    /**
-     * Returns the execution plan for the given json plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param jsonPlan The json plan to be explained.
-     * @param extraDetails The extra explain details which the explain result 
should include, e.g.
-     *     estimated cost, changelog mode for streaming
-     * @return the execution plan.
-     */
     @Experimental
-    String explainJsonPlan(String jsonPlan, ExplainDetail... extraDetails);
+    CompiledPlan compile(List<ModifyOperation> modifyOperations);

Review comment:
       too generic name: `compilePlan`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Pointer to a persisted plan. You can load the content of this reference 
into a {@link
+ * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you 
can directly execute
+ * it with {@link TableEnvironment#executePlan(PlanReference)}.
+ */
+@Experimental
+public final class PlanReference {
+
+    private final @Nullable File file;
+    private final @Nullable String content;
+
+    private PlanReference(@Nullable File file, @Nullable String content) {
+        this.file = file;
+        this.content = content;
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(String path) {
+        return fromFile(Paths.get(path).toFile());
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(Path path) {
+        return fromFile(path.toFile());
+    }
+
+    /** Create a reference starting from a file path. */
+    public static PlanReference fromFile(File file) {
+        return new PlanReference(file, null);
+    }
+
+    /** Create a reference starting from a JSON string. */
+    public static PlanReference fromJsonString(String jsonString) {
+        return new PlanReference(null, jsonString);
+    }
+
+    /**
+     * Create a reference from a file in the classpath, using {@code
+     * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(String classpathFilePath) {

Review comment:
       let's use `fromResource`, I think this is a common term in the Java world

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -1252,4 +1254,52 @@ void createFunction(
      * as one job.
      */
     StatementSet createStatementSet();
+
+    // FLIP-190 methods. They're considered experimental and might change in 
future versions

Review comment:
       nit: at some point FLIP-190 will be history, we don't need this comment 
here and at other locations. if you wanna structure your code then call this 
section generic like `Plan compilation and restore`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Pointer to a persisted plan. You can load the content of this reference 
into a {@link
+ * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you 
can directly execute
+ * it with {@link TableEnvironment#executePlan(PlanReference)}.
+ */
+@Experimental
+public final class PlanReference {
+
+    private final @Nullable File file;
+    private final @Nullable String content;
+
+    private PlanReference(@Nullable File file, @Nullable String content) {
+        this.file = file;
+        this.content = content;
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(String path) {
+        return fromFile(Paths.get(path).toFile());
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(Path path) {
+        return fromFile(path.toFile());
+    }
+
+    /** Create a reference starting from a file path. */
+    public static PlanReference fromFile(File file) {
+        return new PlanReference(file, null);
+    }
+
+    /** Create a reference starting from a JSON string. */
+    public static PlanReference fromJsonString(String jsonString) {
+        return new PlanReference(null, jsonString);
+    }
+
+    /**
+     * Create a reference from a file in the classpath, using {@code
+     * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(String classpathFilePath) {
+        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    }
+
+    /**
+     * Create a reference from a file in the classpath.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
+            throws TableException {
+        URL url = classLoader.getResource(classpathFilePath);
+        if (url == null) {
+            throw new TableException(
+                    "Cannot load the plan reference from classpath, resource 
not found: "
+                            + classpathFilePath);
+        }
+
+        try {
+            return PlanReference.fromFile(new File(url.toURI()));
+        } catch (URISyntaxException e) {
+            throw new TableException(
+                    "Cannot load the plan reference from classpath, invalid 
URI: "
+                            + classpathFilePath,
+                    e);
+        }
+    }
+
+    public Optional<File> getFile() {

Review comment:
       default scope or two static inner classes?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -752,6 +757,38 @@ public StatementSet createStatementSet() {
         return new StatementSetImpl(this);
     }
 
+    @Override
+    public CompiledPlan loadPlan(PlanReference planReference) throws 
IOException {
+        return planner.load(planReference);
+    }
+
+    @Override
+    public CompiledPlan compilePlanSql(String stmt) {
+        List<Operation> operations = getParser().parse(stmt);
+
+        if (operations.size() != 1 || !(operations.get(0) instanceof 
ModifyOperation)) {
+            throw new 
TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG);
+        }
+
+        return planner.compile(Collections.singletonList((ModifyOperation) 
operations.get(0)));
+    }
+
+    @Override
+    public TableResult executePlan(CompiledPlan plan) {
+        List<Transformation<?>> transformations = planner.translate(plan);
+        List<String> sinkIdentifierNames = new ArrayList<>();
+        for (int i = 0; i < transformations.size(); ++i) {
+            // TODO serialize the sink table names to json plan ?

Review comment:
       I think the names are already part of the JSON plan. The question is 
rather whether we can somehow get the names from the transformations. We should 
definitely fix this TODO. Because otherwise the job name might not be 
deterministic.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -1252,4 +1254,52 @@ void createFunction(
      * as one job.
      */
     StatementSet createStatementSet();
+
+    // FLIP-190 methods. They're considered experimental and might change in 
future versions
+
+    /**
+     * Load a plan starting from a {@link PlanReference} into a {@link 
CompiledPlan}. This will
+     * parse the input reference and will validate the plan.
+     *
+     * <p><b>Note:</b> This API is <b>experimental</b> and subject to change 
in future releases.

Review comment:
       Remove the additional warning in the JavaDocs. A warning in the website 
docs should be enough. In the end, we would like to encourage users to try it 
out.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
##########
@@ -86,47 +89,17 @@
      */
     String explain(List<Operation> operations, ExplainDetail... extraDetails);
 
-    /**
-     * Get the json plan of the given {@link ModifyOperation}s.
-     *
-     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
-     * given statement. An ExecNode plan can be serialized to json plan, and a 
json plan can be
-     * deserialized to an ExecNode plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param modifyOperations the {@link ModifyOperation}s to generate json 
plan.
-     * @return the string json representation of an optimized ExecNode plan 
for the given
-     *     operations.
-     */
+    // FLIP-190 methods
+
     @Experimental
-    String getJsonPlan(List<ModifyOperation> modifyOperations);
+    CompiledPlan load(PlanReference planReference) throws IOException;
 
-    /**
-     * Returns the execution plan for the given json plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param jsonPlan The json plan to be explained.
-     * @param extraDetails The extra explain details which the explain result 
should include, e.g.
-     *     estimated cost, changelog mode for streaming
-     * @return the execution plan.
-     */
     @Experimental
-    String explainJsonPlan(String jsonPlan, ExplainDetail... extraDetails);
+    CompiledPlan compile(List<ModifyOperation> modifyOperations);
+
+    @Experimental
+    List<Transformation<?>> translate(CompiledPlan plan);
 
-    /**
-     * Converts a json plan into a set of runnable {@link Transformation}s.
-     *
-     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
-     * given statement. An ExecNode plan can be serialized to json plan, and a 
json plan can be
-     * deserialized to an ExecNode plan.
-     *
-     * <p><b>NOTES:</b>: This is an experimental feature now.
-     *
-     * @param jsonPlan The json plan to be translated.
-     * @return list of corresponding {@link Transformation}s.
-     */
     @Experimental
-    List<Transformation<?>> translateJsonPlan(String jsonPlan);
+    String explain(CompiledPlan plan, ExplainDetail... extraDetails);

Review comment:
       `explainPlan`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * This interface represents a compiled plan that can be executed using {@link
+ * TableEnvironment#executePlan(CompiledPlan)}. A plan can be compiled 
starting from a SQL query
+ * using {@link TableEnvironment#compilePlanSql(String)} and can be loaded 
back from a file or a
+ * string using {@link TableEnvironment#loadPlan(PlanReference)}. A plan can 
be persisted using
+ * {@link #writeToFile(Path, boolean)} or by manually extracting the JSON 
representation with {@link
+ * #asJsonString()}.
+ */
+@Experimental
+public interface CompiledPlan {
+
+    /** Convert the plan to a JSON string representation. */
+    String asJsonString();

Review comment:
       Now that `CompiledPlan` is more than a reference. It would be beneficial 
to expose some management information i.e. the Flink version

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Pointer to a persisted plan. You can load the content of this reference 
into a {@link
+ * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you 
can directly execute
+ * it with {@link TableEnvironment#executePlan(PlanReference)}.
+ */
+@Experimental
+public final class PlanReference {
+
+    private final @Nullable File file;
+    private final @Nullable String content;
+
+    private PlanReference(@Nullable File file, @Nullable String content) {
+        this.file = file;
+        this.content = content;
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(String path) {
+        return fromFile(Paths.get(path).toFile());
+    }
+
+    /** @see #fromFile(File) */
+    public static PlanReference fromFile(Path path) {
+        return fromFile(path.toFile());
+    }
+
+    /** Create a reference starting from a file path. */
+    public static PlanReference fromFile(File file) {
+        return new PlanReference(file, null);
+    }
+
+    /** Create a reference starting from a JSON string. */
+    public static PlanReference fromJsonString(String jsonString) {
+        return new PlanReference(null, jsonString);
+    }
+
+    /**
+     * Create a reference from a file in the classpath, using {@code
+     * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(String classpathFilePath) {
+        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    }
+
+    /**
+     * Create a reference from a file in the classpath.
+     *
+     * @throws TableException if the classpath resource cannot be found
+     */
+    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
+            throws TableException {
+        URL url = classLoader.getResource(classpathFilePath);
+        if (url == null) {
+            throw new TableException(
+                    "Cannot load the plan reference from classpath, resource 
not found: "
+                            + classpathFilePath);
+        }
+
+        try {
+            return PlanReference.fromFile(new File(url.toURI()));
+        } catch (URISyntaxException e) {
+            throw new TableException(
+                    "Cannot load the plan reference from classpath, invalid 
URI: "
+                            + classpathFilePath,
+                    e);
+        }
+    }
+
+    public Optional<File> getFile() {
+        return Optional.ofNullable(file);
+    }
+
+    public Optional<String> getContent() {
+        return Optional.ofNullable(content);
+    }

Review comment:
       Add a `toString`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -142,21 +143,17 @@ public TableResult execute() {
     }
 
     /**
-     * Get the json plan of the all statements and Tables as a batch.
-     *
-     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
-     * statements and Tables. An ExecNode plan can be serialized to json plan, 
and a json plan can
-     * be deserialized to an ExecNode plan.
+     * Get the {@link CompiledPlan} of the all statements and Tables as a 
batch.

Review comment:
       Remove the JavaDocs of an `Impl` method. The interface should have all 
docs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to