twalthr commented on a change in pull request #18701:
URL: https://github.com/apache/flink/pull/18701#discussion_r803652218



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -79,25 +79,37 @@ default void writeToFile(Path path) throws IOException {
     }
 
     /** @see #writeToFile(File, boolean) */
-    default void writeToFile(Path path, boolean ignoreIfExists)
-            throws IOException, UnsupportedOperationException {
+    default void writeToFile(Path path, boolean ignoreIfExists) throws 
IOException {
         writeToFile(path.toFile(), ignoreIfExists);
     }
 
     /**
-     * Writes this plan to a file using the JSON representation. This will not 
overwrite the file if
-     * it's already existing.
+     * Writes this plan to a file using the JSON representation. This 
operation will be noop if the
+     * file already exists, even if the content is different from this plan.
+     *
+     * @throws IOException if the file cannot be written.
      */
     default void writeToFile(File file) throws IOException {
         writeToFile(file, true);
     }
 
-    /** Writes this plan to a file using the JSON representation. */
-    void writeToFile(File file, boolean ignoreIfExists)
-            throws IOException, UnsupportedOperationException;
+    /**
+     * Writes this plan to a file using the JSON representation.
+     *
+     * @throws IOException if the file cannot be written.
+     */
+    void writeToFile(File file, boolean ignoreIfExists) throws IOException;
 
     // --- Accessors
 
     /** Returns the Flink version used to compile the plan. */
     String getFlinkVersion();
+
+    /**
+     * Returns the AST of the specified statement and the execution plan to 
compute the result of
+     * the given statement.
+     *
+     * <p>Shorthand for {@link TableEnvironment#explainPlan(CompiledPlan, 
ExplainDetail...)}.
+     */
+    String explain(ExplainDetail... explainDetails);

Review comment:
       since we are adding a couple of helpful API methods, let's also add some 
fluent methods:
   ```
   CompiledPlan.printJsonString();
   CompiledPlan.printExplain(ExplainDetail...);
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -79,25 +79,37 @@ default void writeToFile(Path path) throws IOException {
     }
 
     /** @see #writeToFile(File, boolean) */
-    default void writeToFile(Path path, boolean ignoreIfExists)
-            throws IOException, UnsupportedOperationException {
+    default void writeToFile(Path path, boolean ignoreIfExists) throws 
IOException {
         writeToFile(path.toFile(), ignoreIfExists);
     }
 
     /**
-     * Writes this plan to a file using the JSON representation. This will not 
overwrite the file if
-     * it's already existing.
+     * Writes this plan to a file using the JSON representation. This 
operation will be noop if the
+     * file already exists, even if the content is different from this plan.
+     *
+     * @throws IOException if the file cannot be written.
      */
     default void writeToFile(File file) throws IOException {
         writeToFile(file, true);

Review comment:
       By default we should fail if the file already exists. Noop by default is 
dangerous.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -1266,6 +1266,12 @@ void createFunction(
      *
      * <p>This method will parse the input reference and will validate the 
plan. The returned
      * instance can be executed via {@link #executePlan(CompiledPlan)}.
+     *
+     * @throws IOException if the plan cannot be loaded from the filesystem, 
or from classpath
+     *     resources.
+     * @throws TableException if the plan is invalid.
+     * @throws UnsupportedOperationException if the {@link 
EnvironmentSettings} is configured in

Review comment:
       nit: Sometimes `EnvironmentSettings` are derived automatically, I would 
rather add a separate paragraph:
   
   ```
   <p>Note: Currently, plan loading is not supported in batch mode.
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
##########
@@ -129,14 +134,48 @@ class StreamPlanner(
     new StreamPlanner(executor, config, moduleManager, functionCatalog, 
catalogManager)
   }
 
+  override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
+    val ctx = createSerdeContext
+    val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx)
+    val execNodeGraph = planReference match {
+      case filePlanReference: FilePlanReference =>
+        objectReader.readValue(filePlanReference.getFile, 
classOf[ExecNodeGraph])
+      case contentPlanReference: ContentPlanReference =>
+        objectReader.readValue(contentPlanReference.getContent, 
classOf[ExecNodeGraph])
+      case resourcePlanReference: ResourcePlanReference => {
+        val url = resourcePlanReference.getClassLoader
+          .getResource(resourcePlanReference.getResourcePath)
+        if (url == null) {
+          throw new IOException(
+            "Cannot load the plan reference from classpath: " + planReference);
+        }
+        objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
+      }
+      case _ => throw new IllegalStateException(
+        "Unknown PlanReference. This is a bug, please contact the developers")
+    }
+
+    new ExecNodeGraphCompiledPlan(
+      this,
+      JsonSerdeUtil.createObjectWriter(createSerdeContext)
+        .withDefaultPrettyPrinter()
+        .writeValueAsString(execNodeGraph),
+      execNodeGraph)
+  }
+
   override def compilePlan(modifyOperations: util.List[ModifyOperation]): 
CompiledPlanInternal = {
     validateAndOverrideConfiguration()
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
     val execGraph = translateToExecNodeGraph(optimizedRelNodes)
     cleanupInternalConfigurations()
 
-    new ExecNodeGraphCompiledPlan(createSerdeContext, execGraph)
+    new ExecNodeGraphCompiledPlan(
+      this,
+      JsonSerdeUtil.createObjectWriter(createSerdeContext)
+        .withDefaultPrettyPrinter()

Review comment:
       shall we only pretty print for the `CompiledPlan.printJsonString()`? I 
guess pretty printing has impact on the plan sizes.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
##########
@@ -134,6 +134,11 @@ class BatchPlanner(
     new BatchPlanner(executor, config, moduleManager, functionCatalog, 
catalogManager)
   }
 
+  override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
+    throw new UnsupportedOperationException(
+      "The batch planner doesn't support the persisted plan feature.")

Review comment:
       nit: `batch planner` is an internal term, how about `Compiled plans are 
not supported in batch mode.`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
##########
@@ -134,6 +134,11 @@ class BatchPlanner(
     new BatchPlanner(executor, config, moduleManager, functionCatalog, 
catalogManager)
   }
 
+  override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
+    throw new UnsupportedOperationException(
+      "The batch planner doesn't support the persisted plan feature.")

Review comment:
       also for the exception below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to