twalthr commented on a change in pull request #18573: URL: https://github.com/apache/flink/pull/18573#discussion_r800761109
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +/** + * Pointer to a persisted plan. You can load the content of this reference into a {@link + * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you can directly execute + * it with {@link TableEnvironment#executePlan(PlanReference)}. + */ +@Experimental +public final class PlanReference { + + private final @Nullable File file; + private final @Nullable String content; + + private PlanReference(@Nullable File file, @Nullable String content) { + this.file = file; + this.content = content; + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(String path) { + return fromFile(Paths.get(path).toFile()); + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(Path path) { + return fromFile(path.toFile()); + } + + /** Create a reference starting from a file path. */ + public static PlanReference fromFile(File file) { + return new PlanReference(file, null); + } + + /** Create a reference starting from a JSON string. */ + public static PlanReference fromJsonString(String jsonString) { + return new PlanReference(null, jsonString); + } + + /** + * Create a reference from a file in the classpath, using {@code + * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(String classpathFilePath) { + return fromClasspath(Thread.currentThread().getContextClassLoader(), classpathFilePath); + } + + /** + * Create a reference from a file in the classpath. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(ClassLoader classLoader, String classpathFilePath) + throws TableException { + URL url = classLoader.getResource(classpathFilePath); Review comment: Isn't this somehow already loading a file? It somehow does more a reference. Ideally, it would be great to have a symmetric operation. Write into resources and read from resources. This is what we need in our tests and I'm pretty sure our end-users would also like to have this functionality. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java ########## @@ -86,47 +89,17 @@ */ String explain(List<Operation> operations, ExplainDetail... extraDetails); - /** - * Get the json plan of the given {@link ModifyOperation}s. - * - * <p>The json plan is the string json representation of an optimized ExecNode plan for the - * given statement. An ExecNode plan can be serialized to json plan, and a json plan can be - * deserialized to an ExecNode plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param modifyOperations the {@link ModifyOperation}s to generate json plan. - * @return the string json representation of an optimized ExecNode plan for the given - * operations. - */ + // FLIP-190 methods + @Experimental - String getJsonPlan(List<ModifyOperation> modifyOperations); + CompiledPlan load(PlanReference planReference) throws IOException; Review comment: too generic name: `loadPlan` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java ########## @@ -86,47 +89,17 @@ */ String explain(List<Operation> operations, ExplainDetail... extraDetails); - /** - * Get the json plan of the given {@link ModifyOperation}s. - * - * <p>The json plan is the string json representation of an optimized ExecNode plan for the - * given statement. An ExecNode plan can be serialized to json plan, and a json plan can be - * deserialized to an ExecNode plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param modifyOperations the {@link ModifyOperation}s to generate json plan. - * @return the string json representation of an optimized ExecNode plan for the given - * operations. - */ + // FLIP-190 methods + @Experimental - String getJsonPlan(List<ModifyOperation> modifyOperations); + CompiledPlan load(PlanReference planReference) throws IOException; - /** - * Returns the execution plan for the given json plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param jsonPlan The json plan to be explained. - * @param extraDetails The extra explain details which the explain result should include, e.g. - * estimated cost, changelog mode for streaming - * @return the execution plan. - */ @Experimental - String explainJsonPlan(String jsonPlan, ExplainDetail... extraDetails); + CompiledPlan compile(List<ModifyOperation> modifyOperations); Review comment: too generic name: `compilePlan` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +/** + * Pointer to a persisted plan. You can load the content of this reference into a {@link + * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you can directly execute + * it with {@link TableEnvironment#executePlan(PlanReference)}. + */ +@Experimental +public final class PlanReference { + + private final @Nullable File file; + private final @Nullable String content; + + private PlanReference(@Nullable File file, @Nullable String content) { + this.file = file; + this.content = content; + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(String path) { + return fromFile(Paths.get(path).toFile()); + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(Path path) { + return fromFile(path.toFile()); + } + + /** Create a reference starting from a file path. */ + public static PlanReference fromFile(File file) { + return new PlanReference(file, null); + } + + /** Create a reference starting from a JSON string. */ + public static PlanReference fromJsonString(String jsonString) { + return new PlanReference(null, jsonString); + } + + /** + * Create a reference from a file in the classpath, using {@code + * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(String classpathFilePath) { Review comment: let's use `fromResource`, I think this is a common term in the Java world ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ########## @@ -1252,4 +1254,52 @@ void createFunction( * as one job. */ StatementSet createStatementSet(); + + // FLIP-190 methods. They're considered experimental and might change in future versions Review comment: nit: at some point FLIP-190 will be history, we don't need this comment here and at other locations. if you wanna structure your code then call this section generic like `Plan compilation and restore` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +/** + * Pointer to a persisted plan. You can load the content of this reference into a {@link + * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you can directly execute + * it with {@link TableEnvironment#executePlan(PlanReference)}. + */ +@Experimental +public final class PlanReference { + + private final @Nullable File file; + private final @Nullable String content; + + private PlanReference(@Nullable File file, @Nullable String content) { + this.file = file; + this.content = content; + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(String path) { + return fromFile(Paths.get(path).toFile()); + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(Path path) { + return fromFile(path.toFile()); + } + + /** Create a reference starting from a file path. */ + public static PlanReference fromFile(File file) { + return new PlanReference(file, null); + } + + /** Create a reference starting from a JSON string. */ + public static PlanReference fromJsonString(String jsonString) { + return new PlanReference(null, jsonString); + } + + /** + * Create a reference from a file in the classpath, using {@code + * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(String classpathFilePath) { + return fromClasspath(Thread.currentThread().getContextClassLoader(), classpathFilePath); + } + + /** + * Create a reference from a file in the classpath. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(ClassLoader classLoader, String classpathFilePath) + throws TableException { + URL url = classLoader.getResource(classpathFilePath); + if (url == null) { + throw new TableException( + "Cannot load the plan reference from classpath, resource not found: " + + classpathFilePath); + } + + try { + return PlanReference.fromFile(new File(url.toURI())); + } catch (URISyntaxException e) { + throw new TableException( + "Cannot load the plan reference from classpath, invalid URI: " + + classpathFilePath, + e); + } + } + + public Optional<File> getFile() { Review comment: default scope or two static inner classes? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -752,6 +757,38 @@ public StatementSet createStatementSet() { return new StatementSetImpl(this); } + @Override + public CompiledPlan loadPlan(PlanReference planReference) throws IOException { + return planner.load(planReference); + } + + @Override + public CompiledPlan compilePlanSql(String stmt) { + List<Operation> operations = getParser().parse(stmt); + + if (operations.size() != 1 || !(operations.get(0) instanceof ModifyOperation)) { + throw new TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG); + } + + return planner.compile(Collections.singletonList((ModifyOperation) operations.get(0))); + } + + @Override + public TableResult executePlan(CompiledPlan plan) { + List<Transformation<?>> transformations = planner.translate(plan); + List<String> sinkIdentifierNames = new ArrayList<>(); + for (int i = 0; i < transformations.size(); ++i) { + // TODO serialize the sink table names to json plan ? Review comment: I think the names are already part of the JSON plan. The question is rather whether we can somehow get the names from the transformations. We should definitely fix this TODO. Because otherwise the job name might not be deterministic. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ########## @@ -1252,4 +1254,52 @@ void createFunction( * as one job. */ StatementSet createStatementSet(); + + // FLIP-190 methods. They're considered experimental and might change in future versions + + /** + * Load a plan starting from a {@link PlanReference} into a {@link CompiledPlan}. This will + * parse the input reference and will validate the plan. + * + * <p><b>Note:</b> This API is <b>experimental</b> and subject to change in future releases. Review comment: Remove the additional warning in the JavaDocs. A warning in the website docs should be enough. In the end, we would like to encourage users to try it out. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java ########## @@ -86,47 +89,17 @@ */ String explain(List<Operation> operations, ExplainDetail... extraDetails); - /** - * Get the json plan of the given {@link ModifyOperation}s. - * - * <p>The json plan is the string json representation of an optimized ExecNode plan for the - * given statement. An ExecNode plan can be serialized to json plan, and a json plan can be - * deserialized to an ExecNode plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param modifyOperations the {@link ModifyOperation}s to generate json plan. - * @return the string json representation of an optimized ExecNode plan for the given - * operations. - */ + // FLIP-190 methods + @Experimental - String getJsonPlan(List<ModifyOperation> modifyOperations); + CompiledPlan load(PlanReference planReference) throws IOException; - /** - * Returns the execution plan for the given json plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param jsonPlan The json plan to be explained. - * @param extraDetails The extra explain details which the explain result should include, e.g. - * estimated cost, changelog mode for streaming - * @return the execution plan. - */ @Experimental - String explainJsonPlan(String jsonPlan, ExplainDetail... extraDetails); + CompiledPlan compile(List<ModifyOperation> modifyOperations); + + @Experimental + List<Transformation<?>> translate(CompiledPlan plan); - /** - * Converts a json plan into a set of runnable {@link Transformation}s. - * - * <p>The json plan is the string json representation of an optimized ExecNode plan for the - * given statement. An ExecNode plan can be serialized to json plan, and a json plan can be - * deserialized to an ExecNode plan. - * - * <p><b>NOTES:</b>: This is an experimental feature now. - * - * @param jsonPlan The json plan to be translated. - * @return list of corresponding {@link Transformation}s. - */ @Experimental - List<Transformation<?>> translateJsonPlan(String jsonPlan); + String explain(CompiledPlan plan, ExplainDetail... extraDetails); Review comment: `explainPlan` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Experimental; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * This interface represents a compiled plan that can be executed using {@link + * TableEnvironment#executePlan(CompiledPlan)}. A plan can be compiled starting from a SQL query + * using {@link TableEnvironment#compilePlanSql(String)} and can be loaded back from a file or a + * string using {@link TableEnvironment#loadPlan(PlanReference)}. A plan can be persisted using + * {@link #writeToFile(Path, boolean)} or by manually extracting the JSON representation with {@link + * #asJsonString()}. + */ +@Experimental +public interface CompiledPlan { + + /** Convert the plan to a JSON string representation. */ + String asJsonString(); Review comment: Now that `CompiledPlan` is more than a reference. It would be beneficial to expose some management information i.e. the Flink version ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +/** + * Pointer to a persisted plan. You can load the content of this reference into a {@link + * CompiledPlan} using {@link TableEnvironment#loadPlan(PlanReference)} or you can directly execute + * it with {@link TableEnvironment#executePlan(PlanReference)}. + */ +@Experimental +public final class PlanReference { + + private final @Nullable File file; + private final @Nullable String content; + + private PlanReference(@Nullable File file, @Nullable String content) { + this.file = file; + this.content = content; + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(String path) { + return fromFile(Paths.get(path).toFile()); + } + + /** @see #fromFile(File) */ + public static PlanReference fromFile(Path path) { + return fromFile(path.toFile()); + } + + /** Create a reference starting from a file path. */ + public static PlanReference fromFile(File file) { + return new PlanReference(file, null); + } + + /** Create a reference starting from a JSON string. */ + public static PlanReference fromJsonString(String jsonString) { + return new PlanReference(null, jsonString); + } + + /** + * Create a reference from a file in the classpath, using {@code + * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(String classpathFilePath) { + return fromClasspath(Thread.currentThread().getContextClassLoader(), classpathFilePath); + } + + /** + * Create a reference from a file in the classpath. + * + * @throws TableException if the classpath resource cannot be found + */ + public static PlanReference fromClasspath(ClassLoader classLoader, String classpathFilePath) + throws TableException { + URL url = classLoader.getResource(classpathFilePath); + if (url == null) { + throw new TableException( + "Cannot load the plan reference from classpath, resource not found: " + + classpathFilePath); + } + + try { + return PlanReference.fromFile(new File(url.toURI())); + } catch (URISyntaxException e) { + throw new TableException( + "Cannot load the plan reference from classpath, invalid URI: " + + classpathFilePath, + e); + } + } + + public Optional<File> getFile() { + return Optional.ofNullable(file); + } + + public Optional<String> getContent() { + return Optional.ofNullable(content); + } Review comment: Add a `toString` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java ########## @@ -142,21 +143,17 @@ public TableResult execute() { } /** - * Get the json plan of the all statements and Tables as a batch. - * - * <p>The json plan is the string json representation of an optimized ExecNode plan for the - * statements and Tables. An ExecNode plan can be serialized to json plan, and a json plan can - * be deserialized to an ExecNode plan. + * Get the {@link CompiledPlan} of the all statements and Tables as a batch. Review comment: Remove the JavaDocs of an `Impl` method. The interface should have all docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org