twalthr commented on a change in pull request #18701: URL: https://github.com/apache/flink/pull/18701#discussion_r803652218
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java ########## @@ -79,25 +79,37 @@ default void writeToFile(Path path) throws IOException { } /** @see #writeToFile(File, boolean) */ - default void writeToFile(Path path, boolean ignoreIfExists) - throws IOException, UnsupportedOperationException { + default void writeToFile(Path path, boolean ignoreIfExists) throws IOException { writeToFile(path.toFile(), ignoreIfExists); } /** - * Writes this plan to a file using the JSON representation. This will not overwrite the file if - * it's already existing. + * Writes this plan to a file using the JSON representation. This operation will be noop if the + * file already exists, even if the content is different from this plan. + * + * @throws IOException if the file cannot be written. */ default void writeToFile(File file) throws IOException { writeToFile(file, true); } - /** Writes this plan to a file using the JSON representation. */ - void writeToFile(File file, boolean ignoreIfExists) - throws IOException, UnsupportedOperationException; + /** + * Writes this plan to a file using the JSON representation. + * + * @throws IOException if the file cannot be written. + */ + void writeToFile(File file, boolean ignoreIfExists) throws IOException; // --- Accessors /** Returns the Flink version used to compile the plan. */ String getFlinkVersion(); + + /** + * Returns the AST of the specified statement and the execution plan to compute the result of + * the given statement. + * + * <p>Shorthand for {@link TableEnvironment#explainPlan(CompiledPlan, ExplainDetail...)}. + */ + String explain(ExplainDetail... explainDetails); Review comment: since we are adding a couple of helpful API methods, let's also add some fluent methods: ``` CompiledPlan.printJsonString(); CompiledPlan.printExplain(ExplainDetail...); ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java ########## @@ -79,25 +79,37 @@ default void writeToFile(Path path) throws IOException { } /** @see #writeToFile(File, boolean) */ - default void writeToFile(Path path, boolean ignoreIfExists) - throws IOException, UnsupportedOperationException { + default void writeToFile(Path path, boolean ignoreIfExists) throws IOException { writeToFile(path.toFile(), ignoreIfExists); } /** - * Writes this plan to a file using the JSON representation. This will not overwrite the file if - * it's already existing. + * Writes this plan to a file using the JSON representation. This operation will be noop if the + * file already exists, even if the content is different from this plan. + * + * @throws IOException if the file cannot be written. */ default void writeToFile(File file) throws IOException { writeToFile(file, true); Review comment: By default we should fail if the file already exists. Noop by default is dangerous. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ########## @@ -1266,6 +1266,12 @@ void createFunction( * * <p>This method will parse the input reference and will validate the plan. The returned * instance can be executed via {@link #executePlan(CompiledPlan)}. + * + * @throws IOException if the plan cannot be loaded from the filesystem, or from classpath + * resources. + * @throws TableException if the plan is invalid. + * @throws UnsupportedOperationException if the {@link EnvironmentSettings} is configured in Review comment: nit: Sometimes `EnvironmentSettings` are derived automatically, I would rather add a separate paragraph: ``` <p>Note: Currently, plan loading is not supported in batch mode. ``` ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala ########## @@ -129,14 +134,48 @@ class StreamPlanner( new StreamPlanner(executor, config, moduleManager, functionCatalog, catalogManager) } + override def loadPlan(planReference: PlanReference): CompiledPlanInternal = { + val ctx = createSerdeContext + val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx) + val execNodeGraph = planReference match { + case filePlanReference: FilePlanReference => + objectReader.readValue(filePlanReference.getFile, classOf[ExecNodeGraph]) + case contentPlanReference: ContentPlanReference => + objectReader.readValue(contentPlanReference.getContent, classOf[ExecNodeGraph]) + case resourcePlanReference: ResourcePlanReference => { + val url = resourcePlanReference.getClassLoader + .getResource(resourcePlanReference.getResourcePath) + if (url == null) { + throw new IOException( + "Cannot load the plan reference from classpath: " + planReference); + } + objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph]) + } + case _ => throw new IllegalStateException( + "Unknown PlanReference. This is a bug, please contact the developers") + } + + new ExecNodeGraphCompiledPlan( + this, + JsonSerdeUtil.createObjectWriter(createSerdeContext) + .withDefaultPrettyPrinter() + .writeValueAsString(execNodeGraph), + execNodeGraph) + } + override def compilePlan(modifyOperations: util.List[ModifyOperation]): CompiledPlanInternal = { validateAndOverrideConfiguration() val relNodes = modifyOperations.map(translateToRel) val optimizedRelNodes = optimize(relNodes) val execGraph = translateToExecNodeGraph(optimizedRelNodes) cleanupInternalConfigurations() - new ExecNodeGraphCompiledPlan(createSerdeContext, execGraph) + new ExecNodeGraphCompiledPlan( + this, + JsonSerdeUtil.createObjectWriter(createSerdeContext) + .withDefaultPrettyPrinter() Review comment: shall we only pretty print for the `CompiledPlan.printJsonString()`? I guess pretty printing has impact on the plan sizes. ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala ########## @@ -134,6 +134,11 @@ class BatchPlanner( new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager) } + override def loadPlan(planReference: PlanReference): CompiledPlanInternal = { + throw new UnsupportedOperationException( + "The batch planner doesn't support the persisted plan feature.") Review comment: nit: `batch planner` is an internal term, how about `Compiled plans are not supported in batch mode.` ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala ########## @@ -134,6 +134,11 @@ class BatchPlanner( new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager) } + override def loadPlan(planReference: PlanReference): CompiledPlanInternal = { + throw new UnsupportedOperationException( + "The batch planner doesn't support the persisted plan feature.") Review comment: also for the exception below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org