This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7b1fc2c29995135b9005f07e385986a40c65621
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Jun 4 17:42:32 2024 +0800

    [FLINK-35473][table-api][table-planner] Move optimizer-related options to 
OptimizerConfigOptions
    
    This closes #24889
---
 .../generated/optimizer_config_configuration.html  | 30 +++++++++++--------
 .../table/api/config/OptimizerConfigOptions.java   | 34 ++++++++++++++++++++++
 .../table/planner/plan/optimize/RelNodeBlock.scala | 27 +++++++++++++----
 .../physical/stream/IncrementalAggregateRule.scala | 14 +++++++--
 .../planner/plan/hints/batch/JoinHintTestBase.java | 10 +++----
 .../plan/nodes/exec/stream/ExpandTestPrograms.java |  4 +--
 .../plan/batch/sql/DagOptimizationTest.scala       | 29 +++++++++---------
 .../planner/plan/batch/sql/LegacySinkTest.scala    |  4 +--
 .../planner/plan/batch/sql/TableSinkTest.scala     |  4 +--
 .../planner/plan/batch/sql/TableSourceTest.scala   |  4 +--
 .../stream/ChangelogModeInferenceTest.scala        |  2 +-
 .../plan/stream/sql/DagOptimizationTest.scala      | 34 ++++++++++------------
 .../table/planner/plan/stream/sql/RankTest.scala   |  4 +--
 .../stream/sql/agg/DistinctAggregateTest.scala     |  2 +-
 .../stream/sql/agg/IncrementalAggregateTest.scala  |  5 ++--
 .../runtime/batch/sql/TableSourceITCase.scala      |  4 +--
 16 files changed, 136 insertions(+), 75 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html 
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index cf9486037c3..2180add365c 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -41,6 +41,12 @@ ONE_PHASE: Enforce to use one stage aggregate which only has 
CompleteGlobalAggre
             <td>Boolean</td>
             <td>When it is true, the optimizer will try to push dynamic 
filtering into scan table source, the irrelevant partitions or input data will 
be filtered to reduce scan I/O in runtime.</td>
         </tr>
+        <tr>
+            <td><h5>table.optimizer.incremental-agg-enabled</h5><br> <span 
class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When both local aggregation and distinct aggregation splitting 
are enabled, a distinct aggregation will be optimized into four aggregations, 
i.e., local-agg1, global-agg1, local-agg2, and global-agg2. We can combine 
global-agg1 and local-agg2 into a single operator (we call it incremental agg 
because it receives incremental accumulators and outputs incremental results). 
In this way, we can reduce some state overhead and resources. Default is 
enabled.</td>
+        </tr>
         <tr>
             <td><h5>table.optimizer.join-reorder-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">false</td>
@@ -65,6 +71,12 @@ ONE_PHASE: Enforce to use one stage aggregate which only has 
CompleteGlobalAggre
             <td><p>Enum</p></td>
             <td>When it is `TRY_RESOLVE`, the optimizer tries to resolve the 
correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog 
pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), 
Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only 
changelog pipeline. For updates, there are  three main NDU problems:<br />1. 
Non-deterministic functions, include scalar, table, aggregate functions, both 
builtin and custom ones [...]
         </tr>
+        <tr>
+            
<td><h5>table.optimizer.reuse-optimize-block-with-digest-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When true, the optimizer will try to find out duplicated 
sub-plans by digest to build optimize blocks (a.k.a. common sub-graphs). Each 
optimize block will be optimized independently.</td>
+        </tr>
         <tr>
             <td><h5>table.optimizer.reuse-source-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">true</td>
@@ -101,18 +113,6 @@ ONE_PHASE: Enforce to use one stage aggregate which only 
has CompleteGlobalAggre
             <td>MemorySize</td>
             <td>Min data volume threshold of the runtime filter probe side. 
Estimated data volume needs to be over this value to try to inject runtime 
filter.This value should be larger than <code 
class="highlighter-rouge">table.optimizer.runtime-filter.max-build-data-size</code>.</td>
         </tr>
-        <tr>
-            <td><h5>table.optimizer.source.aggregate-pushdown-enabled</h5><br> 
<span class="label label-primary">Batch</span></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>When it is true, the optimizer will push down the local 
aggregates into the TableSource which implements SupportsAggregatePushDown.</td>
-        </tr>
-        <tr>
-            <td><h5>table.optimizer.source.predicate-pushdown-enabled</h5><br> 
<span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>When it is true, the optimizer will push down predicates into 
the FilterableTableSource. Default value is true.</td>
-        </tr>
         <tr>
             <td><h5>table.optimizer.source.report-statistics-enabled</h5><br> 
<span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">true</td>
@@ -125,5 +125,11 @@ ONE_PHASE: Enforce to use one stage aggregate which only 
has CompleteGlobalAggre
             <td>Boolean</td>
             <td>If set to true, it will merge projects when converting SqlNode 
to RelNode.<br />Note: it is not recommended to turn on unless you are aware of 
possible side effects, such as causing the output of certain non-deterministic 
expressions to not meet expectations(see FLINK-20887).</td>
         </tr>
+        <tr>
+            <td><h5>table.optimizer.union-all-as-breakpoint-enabled</h5><br> 
<span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When true, the optimizer will breakup the graph at union-all 
node when it's a breakpoint. When false, the optimizer will skip the union-all 
node even it's a breakpoint, and will try find the breakpoint in its 
inputs.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 59afe1786f1..9c024a50f52 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -274,6 +274,40 @@ public class OptimizerConfigOptions {
                                                     + "such as causing the 
output of certain non-deterministic expressions to not meet expectations(see 
FLINK-20887).")
                                     .build());
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED =
+            key("table.optimizer.union-all-as-breakpoint-enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When true, the optimizer will breakup the graph 
at union-all node "
+                                    + "when it's a breakpoint. When false, the 
optimizer will skip the union-all node "
+                                    + "even it's a breakpoint, and will try 
find the breakpoint in its inputs.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean>
+            TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED =
+                    
key("table.optimizer.reuse-optimize-block-with-digest-enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "When true, the optimizer will try to find 
out duplicated sub-plans by "
+                                            + "digest to build optimize blocks 
(a.k.a. common sub-graphs). "
+                                            + "Each optimize block will be 
optimized independently.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED =
+            key("table.optimizer.incremental-agg-enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When both local aggregation and distinct 
aggregation splitting "
+                                    + "are enabled, a distinct aggregation 
will be optimized into four aggregations, "
+                                    + "i.e., local-agg1, global-agg1, 
local-agg2, and global-agg2. We can combine global-agg1 "
+                                    + "and local-agg2 into a single operator 
(we call it incremental agg because "
+                                    + "it receives incremental accumulators 
and outputs incremental results). "
+                                    + "In this way, we can reduce some state 
overhead and resources. Default is enabled.");
+
     /** Strategy for handling non-deterministic updates. */
     @PublicEvolving
     public enum NonDeterministicUpdateStrategy {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
index 7cabc65d150..0599a1fde80 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.{ConfigOption, ReadableConfig}
 import org.apache.flink.configuration.ConfigOptions.key
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.plan.`trait`.MiniBatchInterval
 import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
 import 
org.apache.flink.table.planner.plan.reuse.SubplanReuser.{SubplanReuseContext, 
SubplanReuseShuttle}
@@ -50,7 +51,7 @@ import scala.collection.mutable
  *      root to leaf again, if meet a RelNode which has multiple sink RelNode, 
the RelNode is the
  *      output node of a new block (or named break-point). There are several 
special cases that a
  *      RelNode can not be a break-point. (1). UnionAll is not a break-point 
when
- *      
[[RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] is 
false (2).
+ *      
[[OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] is 
false (2).
  *      [[TableFunctionScan]], [[Snapshot]] or window aggregate ([[Aggregate]] 
on a [[Project]] with
  *      window attribute) are not a break-point because their physical 
RelNodes are a composite
  *      RelNode, each of them cannot be optimized individually. e.g. 
FlinkLogicalTableFunctionScan
@@ -229,7 +230,7 @@ class RelNodeBlockPlanBuilder private (tableConfig: 
ReadableConfig) {
   private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]()
 
   private val isUnionAllAsBreakPointEnabled = tableConfig
-    
.get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
+    .get(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
 
   /**
    * Decompose the [[RelNode]] plan into many [[RelNodeBlock]]s, and rebuild 
[[RelNodeBlock]] plan.
@@ -352,7 +353,15 @@ class RelNodeBlockPlanBuilder private (tableConfig: 
ReadableConfig) {
 
 object RelNodeBlockPlanBuilder {
 
-  // It is a experimental config, will may be removed later.
+  /**
+   * Whether to treat union-all node as a breakpoint.
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED: ConfigOption[JBoolean] =
     key("table.optimizer.union-all-as-breakpoint-enabled")
@@ -363,7 +372,15 @@ object RelNodeBlockPlanBuilder {
           "when it's a breakpoint. When false, the optimizer will skip the 
union-all node " +
           "even it's a breakpoint, and will try find the breakpoint in its 
inputs.")
 
-  // It is a experimental config, will may be removed later.
+  /**
+   * Whether to reuse optimize block based on digest.
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED: 
ConfigOption[JBoolean] =
     key("table.optimizer.reuse-optimize-block-with-digest-enabled")
@@ -413,7 +430,7 @@ object RelNodeBlockPlanBuilder {
    */
   private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: 
ReadableConfig): Seq[RelNode] = {
     val findOpBlockWithDigest = tableConfig
-      
.get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
+      
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
     if (!findOpBlockWithDigest) {
       return relNodes
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
index 2fb63c0bcb0..c40f7bc83e0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.PartialFinalType
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange,
 StreamPhysicalGlobalGroupAggregate, StreamPhysicalIncrementalGroupAggregate, 
StreamPhysicalLocalGroupAggregate}
@@ -62,7 +63,7 @@ class IncrementalAggregateRule
 
     // whether incremental aggregate is enabled
     val incrementalAggEnabled =
-      
tableConfig.get(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED)
+      
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED)
 
     partialGlobalAgg.partialFinalType == PartialFinalType.PARTIAL &&
     finalLocalAgg.partialFinalType == PartialFinalType.FINAL &&
@@ -155,7 +156,16 @@ class IncrementalAggregateRule
 object IncrementalAggregateRule {
   val INSTANCE = new IncrementalAggregateRule
 
-  // It is a experimental config, will may be removed later.
+  /**
+   * Whether to enable incremental aggregation.
+   *
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED: ConfigOption[JBoolean] =
     key("table.optimizer.incremental-agg-enabled")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index 1aed3612df1..05250b609d8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -24,8 +24,8 @@ import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.planner.hint.JoinStrategy;
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.utils.BatchTableTestUtil;
 import org.apache.flink.table.planner.utils.PlanKind;
@@ -680,8 +680,8 @@ public abstract class JoinHintTestBase extends 
TableTestBase {
         util.tableEnv()
                 .getConfig()
                 .set(
-                        RelNodeBlockPlanBuilder
-                                
.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(),
+                        OptimizerConfigOptions
+                                
.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
                         true);
 
         // the build side in this view is left
@@ -731,8 +731,8 @@ public abstract class JoinHintTestBase extends 
TableTestBase {
         util.tableEnv()
                 .getConfig()
                 .set(
-                        RelNodeBlockPlanBuilder
-                                
.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(),
+                        OptimizerConfigOptions
+                                
.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
                         true);
 
         // the build side in this view is left
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
index af6195174e4..752449a913c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.table.api.config.AggregatePhaseStrategy;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
@@ -38,8 +37,7 @@ public class ExpandTestPrograms {
                     .setupConfig(
                             
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
                     .setupConfig(
-                            
IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(),
-                            false)
+                            
OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, false)
                     .setupTableSource(
                             SourceTestStep.newBuilder("MyTable")
                                     .addSchema("a int", "b bigint", "c 
varchar")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index a388d8747cb..051fabca5e6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
 import org.apache.flink.table.planner.utils.{TableFunc1, TableTestBase}
 import org.apache.flink.table.types.logical._
@@ -130,7 +130,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM 
MyTable GROUP BY c")
     util.tableEnv.createTemporaryView("table1", table1)
@@ -151,9 +151,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
-      Boolean.box(true))
+    util.tableEnv.getConfig
+      
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, 
Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 
'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable 
WHERE a <= 10")
@@ -177,7 +176,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 
'b, 'c, 'd, 'e)
 
@@ -202,7 +201,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b FROM MyTable WHERE 
a <= 10")
@@ -232,7 +231,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks5(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     // test with non-deterministic udf
     util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf())
@@ -256,7 +255,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiLevelViews(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c 
LIKE '%hello%'")
@@ -297,7 +296,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksWithUDTF(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTemporarySystemFunction("split", new TableFunc1)
     val sqlQuery1 =
@@ -340,7 +339,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val table =
@@ -364,10 +363,10 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
@@ -405,7 +404,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
@@ -438,7 +437,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
index 873b10a0909..34d345808b6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 
@@ -45,7 +45,7 @@ class LegacySinkTest extends TableTestBase {
   def testMultiSinks(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM 
MyTable GROUP BY c")
     util.tableEnv.createTemporaryView("table1", table1)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
index 8a00d9be219..07888ed2c4b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 
@@ -65,7 +65,7 @@ class TableSinkTest extends TableTestBase {
                      |""".stripMargin)
 
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM 
MyTable GROUP BY c")
     util.tableEnv.createTemporaryView("table1", table1)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 418258d0a33..a0597969eab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.batch.sql
 
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.utils._
 
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -212,7 +212,7 @@ class TableSourceTest extends TableTestBase {
   @Test
   def testTableHintWithDigestReuseForLogicalTableScan(): Unit = {
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     util.tableEnv.executeSql(s"""
                                 |CREATE TABLE MySink (
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index 375258529ea..bde6d7a35d6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -249,7 +249,7 @@ class ChangelogModeInferenceTest extends TableTestBase {
   @Test
   def testPropagateUpdateKindAmongRelNodeBlocks(): Unit = {
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     util.addTable("""
                     |create table sink1 (
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
index 1a8b2577706..b051bb60d36 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
 import org.apache.flink.table.planner.utils.{TableFunc1, TableTestBase}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
@@ -116,7 +116,7 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testSingleSinkSplitOnUnion(): Unit = {
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val sqlQuery = "SELECT SUM(a) AS total_sum FROM " +
@@ -130,7 +130,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM 
MyTable GROUP BY c")
     util.tableEnv.createTemporaryView("table1", table1)
@@ -155,9 +155,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
-      Boolean.box(true))
+    util.tableEnv.getConfig
+      
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, 
Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 
'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable 
WHERE a <= 10")
@@ -184,9 +183,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
-      Boolean.box(true))
+    util.tableEnv.getConfig
+      
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, 
Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 
'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable 
WHERE a <= 10")
@@ -244,7 +242,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinks5(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     // test with non-deterministic udf
     util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf())
@@ -272,7 +270,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksWithUDTF(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTemporarySystemFunction("split", new TableFunc1)
     val sqlQuery1 =
@@ -319,7 +317,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val table =
@@ -347,10 +345,10 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
@@ -394,7 +392,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
@@ -433,7 +431,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiSinksSplitOnUnion4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
@@ -547,7 +545,7 @@ class DagOptimizationTest extends TableTestBase {
   def testMultiLevelViews(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c 
LIKE '%hello%'")
@@ -594,7 +592,7 @@ class DagOptimizationTest extends TableTestBase {
   def testSharedUnionNode(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
     util.tableEnv.getConfig.set(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
       Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c 
LIKE '%hello%'")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 82547355d82..9992bb71a45 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.assertj.core.api.Assertions.{assertThatExceptionOfType, 
assertThatThrownBy}
@@ -889,7 +889,7 @@ class RankTest extends TableTestBase {
   @Test
   def testUpdatableRankAfterIntermediateScan(): Unit = {
     util.tableEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     util.tableEnv.executeSql("""
                                |CREATE VIEW v1 AS
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 13c98427dca..54269c54454 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -51,7 +51,7 @@ class DistinctAggregateTest(
       Boolean.box(splitDistinctAggEnabled))
     // disable incremental agg
     util.tableEnv.getConfig
-      .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, 
Boolean.box(false))
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, 
Boolean.box(false))
   }
 
   @TestTemplate
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
index 8293725de25..70f0170ab34 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
@@ -17,8 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.stream.sql.agg
 
-import org.apache.flink.table.api.config.AggregatePhaseStrategy
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule
+import org.apache.flink.table.api.config.{AggregatePhaseStrategy, 
OptimizerConfigOptions}
 import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
 
 import org.junit.jupiter.api.BeforeEach
@@ -37,7 +36,7 @@ class IncrementalAggregateTest(
     super.before()
     // enable incremental agg
     util.tableEnv.getConfig
-      .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, 
Boolean.box(true))
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, 
Boolean.box(true))
   }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index bae6b751ec2..0230587401f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql
 
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.{createTempFile,
 createTempFolder}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -389,7 +389,7 @@ class TableSourceITCase extends BatchTestBase {
   @Test
   def testTableHintWithLogicalTableScanReuse(): Unit = {
     tEnv.getConfig.set(
-      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     val resultPath = createTempFolder().getAbsolutePath
     tEnv.executeSql(s"""


Reply via email to