libenchao commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1414825282


##########
flink-table/flink-sql-gateway/src/test/resources/sql/repeated_dql.q:
##########
@@ -0,0 +1,220 @@
+# statement-set.q - BEGIN STATEMENT SET, END

Review Comment:
   You forgot to change it to current filename.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -274,6 +276,33 @@ public class OptimizerConfigOptions {
                                                     + "such as causing the 
output of certain non-deterministic expressions to not meet expectations(see 
FLINK-20887).")
                                     .build());
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_PLAN_CACHE_ENABLED =
+            key("table.optimizer.plan-cache.enabled")

Review Comment:
   Since this is dedicated for sql-gateway for now, how about move these 
configs to `SqlGatewayServiceConfigOptions`? 
   
   Besides, if it's not enabled by default, I would suggest to add it to "flink 
olap quickstart" doc too.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java:
##########
@@ -475,4 +476,23 @@ public void setRootConfiguration(ReadableConfig 
rootConfiguration) {
     public static TableConfig getDefault() {
         return new TableConfig();
     }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Actually using a mutable object as a map key is not a good practice. I'm 
wondering if there is another way to implement the plan cache, such as 
invalidating the cache when config is changed?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CachedPlan.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/** SQL execution plan cache. */
+@Internal
+public interface CachedPlan {
+    /** Return the origin operation parsed from the origin statement. */

Review Comment:
   ```suggestion
       /** Return the cached operation parsed from the statement. */
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##########
@@ -52,18 +56,39 @@
 /** Test {@link SqlGatewayService}#executeStatement. */
 public class SqlGatewayServiceStatementITCase extends 
AbstractSqlGatewayStatementITCase {
 
-    private final SessionEnvironment defaultSessionEnvironment =
+    private static final SessionEnvironment DEFAULT_SESSION_ENVIRONMENT =
             SessionEnvironment.newBuilder()
                     .setSessionEndpointVersion(MockedEndpointVersion.V1)
                     .build();
 
+    private static final SessionEnvironment 
SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .addSessionConfig(
+                            Collections.singletonMap(
+                                    TABLE_OPTIMIZER_PLAN_CACHE_ENABLED.key(), 
"true"))
+                    .build();
+
     private SessionHandle sessionHandle;
 
+    @Parameters(name = "parameters={0}")
+    public static List<TestParameters> parameters() throws Exception {
+        return listFlinkSqlTests().stream()
+                .map(path -> new StatementTestParameters(path, 
path.endsWith("repeated_dql.q")))
+                .collect(Collectors.toList());
+    }
+
     @BeforeEach
     @Override
     public void before(@TempDir Path temporaryFolder) throws Exception {
         super.before(temporaryFolder);
-        sessionHandle = service.openSession(defaultSessionEnvironment);
+        boolean planCacheEnabled =

Review Comment:
   The test you added actually cannot tell whether cached feature is used or 
not. This test can only show that if cache is enabled, the 
`SqlGatewayService}#executeStatement` works fine. 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ResultProvider.java:
##########
@@ -59,4 +59,7 @@ public interface ResultProvider {
      * {@link CloseableIterator#next()} method returns a row.
      */
     boolean isFirstRowReady();
+
+    /** Reset this ResultProvider to the origin state when we create it. */
+    default void reset() {}

Review Comment:
   Adding a `reset` contract is fragile to break, I'm trying to understand the 
rational behind. If I'm not mistaken, the reason is `ResultProvider` is also 
cached, if we can make it bind only to a real job instance, then we'll don't 
need to reset it, do you think it's doable?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -85,14 +86,17 @@ public class SessionContext {
     private boolean isStatementSetState;
     private final List<ModifyOperation> statementSetOperations;
 
+    private final PlanCacheManager planCacheManager;

Review Comment:
   Annotate it `Nulllable` if it's nullable, see: 
https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-optional



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CachedPlan.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/** SQL execution plan cache. */
+@Internal
+public interface CachedPlan {
+    /** Return the origin operation parsed from the origin statement. */
+    Operation getOriginOperation();

Review Comment:
   I think `getOperation` is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to