This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7b1fc2c29995135b9005f07e385986a40c65621 Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Tue Jun 4 17:42:32 2024 +0800 [FLINK-35473][table-api][table-planner] Move optimizer-related options to OptimizerConfigOptions This closes #24889 --- .../generated/optimizer_config_configuration.html | 30 +++++++++++-------- .../table/api/config/OptimizerConfigOptions.java | 34 ++++++++++++++++++++++ .../table/planner/plan/optimize/RelNodeBlock.scala | 27 +++++++++++++---- .../physical/stream/IncrementalAggregateRule.scala | 14 +++++++-- .../planner/plan/hints/batch/JoinHintTestBase.java | 10 +++---- .../plan/nodes/exec/stream/ExpandTestPrograms.java | 4 +-- .../plan/batch/sql/DagOptimizationTest.scala | 29 +++++++++--------- .../planner/plan/batch/sql/LegacySinkTest.scala | 4 +-- .../planner/plan/batch/sql/TableSinkTest.scala | 4 +-- .../planner/plan/batch/sql/TableSourceTest.scala | 4 +-- .../stream/ChangelogModeInferenceTest.scala | 2 +- .../plan/stream/sql/DagOptimizationTest.scala | 34 ++++++++++------------ .../table/planner/plan/stream/sql/RankTest.scala | 4 +-- .../stream/sql/agg/DistinctAggregateTest.scala | 2 +- .../stream/sql/agg/IncrementalAggregateTest.scala | 5 ++-- .../runtime/batch/sql/TableSourceITCase.scala | 4 +-- 16 files changed, 136 insertions(+), 75 deletions(-) diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html index cf9486037c3..2180add365c 100644 --- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html +++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html @@ -41,6 +41,12 @@ ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggre <td>Boolean</td> <td>When it is true, the optimizer will try to push dynamic filtering into scan table source, the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.</td> </tr> + <tr> + <td><h5>table.optimizer.incremental-agg-enabled</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>When both local aggregation and distinct aggregation splitting are enabled, a distinct aggregation will be optimized into four aggregations, i.e., local-agg1, global-agg1, local-agg2, and global-agg2. We can combine global-agg1 and local-agg2 into a single operator (we call it incremental agg because it receives incremental accumulators and outputs incremental results). In this way, we can reduce some state overhead and resources. Default is enabled.</td> + </tr> <tr> <td><h5>table.optimizer.join-reorder-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">false</td> @@ -65,6 +71,12 @@ ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggre <td><p>Enum</p></td> <td>When it is `TRY_RESOLVE`, the optimizer tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only changelog pipeline. For updates, there are three main NDU problems:<br />1. Non-deterministic functions, include scalar, table, aggregate functions, both builtin and custom ones [...] </tr> + <tr> + <td><h5>table.optimizer.reuse-optimize-block-with-digest-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>When true, the optimizer will try to find out duplicated sub-plans by digest to build optimize blocks (a.k.a. common sub-graphs). Each optimize block will be optimized independently.</td> + </tr> <tr> <td><h5>table.optimizer.reuse-source-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">true</td> @@ -101,18 +113,6 @@ ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggre <td>MemorySize</td> <td>Min data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.This value should be larger than <code class="highlighter-rouge">table.optimizer.runtime-filter.max-build-data-size</code>.</td> </tr> - <tr> - <td><h5>table.optimizer.source.aggregate-pushdown-enabled</h5><br> <span class="label label-primary">Batch</span></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>When it is true, the optimizer will push down the local aggregates into the TableSource which implements SupportsAggregatePushDown.</td> - </tr> - <tr> - <td><h5>table.optimizer.source.predicate-pushdown-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.</td> - </tr> <tr> <td><h5>table.optimizer.source.report-statistics-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">true</td> @@ -125,5 +125,11 @@ ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggre <td>Boolean</td> <td>If set to true, it will merge projects when converting SqlNode to RelNode.<br />Note: it is not recommended to turn on unless you are aware of possible side effects, such as causing the output of certain non-deterministic expressions to not meet expectations(see FLINK-20887).</td> </tr> + <tr> + <td><h5>table.optimizer.union-all-as-breakpoint-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>When true, the optimizer will breakup the graph at union-all node when it's a breakpoint. When false, the optimizer will skip the union-all node even it's a breakpoint, and will try find the breakpoint in its inputs.</td> + </tr> </tbody> </table> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index 59afe1786f1..9c024a50f52 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -274,6 +274,40 @@ public class OptimizerConfigOptions { + "such as causing the output of certain non-deterministic expressions to not meet expectations(see FLINK-20887).") .build()); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Boolean> TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED = + key("table.optimizer.union-all-as-breakpoint-enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "When true, the optimizer will breakup the graph at union-all node " + + "when it's a breakpoint. When false, the optimizer will skip the union-all node " + + "even it's a breakpoint, and will try find the breakpoint in its inputs."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Boolean> + TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED = + key("table.optimizer.reuse-optimize-block-with-digest-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "When true, the optimizer will try to find out duplicated sub-plans by " + + "digest to build optimize blocks (a.k.a. common sub-graphs). " + + "Each optimize block will be optimized independently."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Boolean> TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED = + key("table.optimizer.incremental-agg-enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "When both local aggregation and distinct aggregation splitting " + + "are enabled, a distinct aggregation will be optimized into four aggregations, " + + "i.e., local-agg1, global-agg1, local-agg2, and global-agg2. We can combine global-agg1 " + + "and local-agg2 into a single operator (we call it incremental agg because " + + "it receives incremental accumulators and outputs incremental results). " + + "In this way, we can reduce some state overhead and resources. Default is enabled."); + /** Strategy for handling non-deterministic updates. */ @PublicEvolving public enum NonDeterministicUpdateStrategy { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala index 7cabc65d150..0599a1fde80 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize import org.apache.flink.annotation.Experimental import org.apache.flink.configuration.{ConfigOption, ReadableConfig} import org.apache.flink.configuration.ConfigOptions.key +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.plan.`trait`.MiniBatchInterval import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink import org.apache.flink.table.planner.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle} @@ -50,7 +51,7 @@ import scala.collection.mutable * root to leaf again, if meet a RelNode which has multiple sink RelNode, the RelNode is the * output node of a new block (or named break-point). There are several special cases that a * RelNode can not be a break-point. (1). UnionAll is not a break-point when - * [[RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] is false (2). + * [[OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] is false (2). * [[TableFunctionScan]], [[Snapshot]] or window aggregate ([[Aggregate]] on a [[Project]] with * window attribute) are not a break-point because their physical RelNodes are a composite * RelNode, each of them cannot be optimized individually. e.g. FlinkLogicalTableFunctionScan @@ -229,7 +230,7 @@ class RelNodeBlockPlanBuilder private (tableConfig: ReadableConfig) { private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]() private val isUnionAllAsBreakPointEnabled = tableConfig - .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED) + .get(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED) /** * Decompose the [[RelNode]] plan into many [[RelNodeBlock]]s, and rebuild [[RelNodeBlock]] plan. @@ -352,7 +353,15 @@ class RelNodeBlockPlanBuilder private (tableConfig: ReadableConfig) { object RelNodeBlockPlanBuilder { - // It is a experimental config, will may be removed later. + /** + * Whether to treat union-all node as a breakpoint. + * @deprecated + * This configuration has been deprecated as part of FLIP-457 and will be removed in Flink 2.0. + * Please use + * [[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] + * instead. + */ + @Deprecated @Experimental val TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED: ConfigOption[JBoolean] = key("table.optimizer.union-all-as-breakpoint-enabled") @@ -363,7 +372,15 @@ object RelNodeBlockPlanBuilder { "when it's a breakpoint. When false, the optimizer will skip the union-all node " + "even it's a breakpoint, and will try find the breakpoint in its inputs.") - // It is a experimental config, will may be removed later. + /** + * Whether to reuse optimize block based on digest. + * @deprecated + * This configuration has been deprecated as part of FLIP-457 and will be removed in Flink 2.0. + * Please use + * [[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED]] + * instead. + */ + @Deprecated @Experimental val TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED: ConfigOption[JBoolean] = key("table.optimizer.reuse-optimize-block-with-digest-enabled") @@ -413,7 +430,7 @@ object RelNodeBlockPlanBuilder { */ private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: ReadableConfig): Seq[RelNode] = { val findOpBlockWithDigest = tableConfig - .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED) + .get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED) if (!findOpBlockWithDigest) { return relNodes } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala index 2fb63c0bcb0..c40f7bc83e0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.annotation.Experimental import org.apache.flink.configuration.ConfigOption import org.apache.flink.configuration.ConfigOptions.key +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.PartialFinalType import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange, StreamPhysicalGlobalGroupAggregate, StreamPhysicalIncrementalGroupAggregate, StreamPhysicalLocalGroupAggregate} @@ -62,7 +63,7 @@ class IncrementalAggregateRule // whether incremental aggregate is enabled val incrementalAggEnabled = - tableConfig.get(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED) + tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED) partialGlobalAgg.partialFinalType == PartialFinalType.PARTIAL && finalLocalAgg.partialFinalType == PartialFinalType.FINAL && @@ -155,7 +156,16 @@ class IncrementalAggregateRule object IncrementalAggregateRule { val INSTANCE = new IncrementalAggregateRule - // It is a experimental config, will may be removed later. + /** + * Whether to enable incremental aggregation. + * + * @deprecated + * This configuration has been deprecated as part of FLIP-457 and will be removed in Flink 2.0. + * Please use + * [[org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED]] + * instead. + */ + @Deprecated @Experimental val TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED: ConfigOption[JBoolean] = key("table.optimizer.incremental-agg-enabled") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java index 1aed3612df1..05250b609d8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java @@ -24,8 +24,8 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.planner.hint.JoinStrategy; -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; import org.apache.flink.table.planner.utils.BatchTableTestUtil; import org.apache.flink.table.planner.utils.PlanKind; @@ -680,8 +680,8 @@ public abstract class JoinHintTestBase extends TableTestBase { util.tableEnv() .getConfig() .set( - RelNodeBlockPlanBuilder - .TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), + OptimizerConfigOptions + .TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true); // the build side in this view is left @@ -731,8 +731,8 @@ public abstract class JoinHintTestBase extends TableTestBase { util.tableEnv() .getConfig() .set( - RelNodeBlockPlanBuilder - .TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), + OptimizerConfigOptions + .TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true); // the build side in this view is left diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java index af6195174e4..752449a913c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.table.api.config.AggregatePhaseStrategy; import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule; import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; @@ -38,8 +37,7 @@ public class ExpandTestPrograms { .setupConfig( OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) .setupConfig( - IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), - false) + OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, false) .setupTableSource( SourceTestStep.newBuilder("MyTable") .addSchema("a int", "b bigint", "c varchar") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala index a388d8747cb..051fabca5e6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.batch.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api._ +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf import org.apache.flink.table.planner.utils.{TableFunc1, TableTestBase} import org.apache.flink.table.types.logical._ @@ -130,7 +130,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks1(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c") util.tableEnv.createTemporaryView("table1", table1) @@ -151,9 +151,8 @@ class DagOptimizationTest extends TableTestBase { @Test def testMultiSinks2(): Unit = { val stmtSet = util.tableEnv.createStatementSet() - util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, - Boolean.box(true)) + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true)) util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e) val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10") @@ -177,7 +176,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks3(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e) @@ -202,7 +201,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks4(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10") @@ -232,7 +231,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks5(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) // test with non-deterministic udf util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf()) @@ -256,7 +255,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiLevelViews(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'") @@ -297,7 +296,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksWithUDTF(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTemporarySystemFunction("split", new TableFunc1) val sqlQuery1 = @@ -340,7 +339,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion1(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val table = @@ -364,10 +363,10 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion2(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) @@ -405,7 +404,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion3(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) @@ -438,7 +437,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion4(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala index 873b10a0909..34d345808b6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.batch.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api._ +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.table.types.logical.{BigIntType, IntType} @@ -45,7 +45,7 @@ class LegacySinkTest extends TableTestBase { def testMultiSinks(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c") util.tableEnv.createTemporaryView("table1", table1) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala index 8a00d9be219..07888ed2c4b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.batch.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api._ -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.table.types.logical.{BigIntType, IntType} @@ -65,7 +65,7 @@ class TableSinkTest extends TableTestBase { |""".stripMargin) util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c") util.tableEnv.createTemporaryView("table1", table1) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala index 418258d0a33..a0597969eab 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.plan.batch.sql -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.utils._ import org.junit.jupiter.api.{BeforeEach, Test} @@ -212,7 +212,7 @@ class TableSourceTest extends TableTestBase { @Test def testTableHintWithDigestReuseForLogicalTableScan(): Unit = { util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) util.tableEnv.executeSql(s""" |CREATE TABLE MySink ( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala index 375258529ea..bde6d7a35d6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala @@ -249,7 +249,7 @@ class ChangelogModeInferenceTest extends TableTestBase { @Test def testPropagateUpdateKindAmongRelNodeBlocks(): Unit = { util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) util.addTable(""" |create table sink1 ( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala index 1a8b2577706..b051bb60d36 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api._ +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf import org.apache.flink.table.planner.utils.{TableFunc1, TableTestBase} import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} @@ -116,7 +116,7 @@ class DagOptimizationTest extends TableTestBase { @Test def testSingleSinkSplitOnUnion(): Unit = { util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val sqlQuery = "SELECT SUM(a) AS total_sum FROM " + @@ -130,7 +130,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks1(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c") util.tableEnv.createTemporaryView("table1", table1) @@ -155,9 +155,8 @@ class DagOptimizationTest extends TableTestBase { @Test def testMultiSinks2(): Unit = { val stmtSet = util.tableEnv.createStatementSet() - util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, - Boolean.box(true)) + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true)) util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e) val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10") @@ -184,9 +183,8 @@ class DagOptimizationTest extends TableTestBase { @Test def testMultiSinks3(): Unit = { val stmtSet = util.tableEnv.createStatementSet() - util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, - Boolean.box(true)) + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true)) util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e) val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10") @@ -244,7 +242,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinks5(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) // test with non-deterministic udf util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf()) @@ -272,7 +270,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksWithUDTF(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTemporarySystemFunction("split", new TableFunc1) val sqlQuery1 = @@ -319,7 +317,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion1(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val table = @@ -347,10 +345,10 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion2(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) @@ -394,7 +392,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion3(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) @@ -433,7 +431,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiSinksSplitOnUnion4(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c) @@ -547,7 +545,7 @@ class DagOptimizationTest extends TableTestBase { def testMultiLevelViews(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'") @@ -594,7 +592,7 @@ class DagOptimizationTest extends TableTestBase { def testSharedUnionNode(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false)) val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala index 82547355d82..9992bb71a45 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api._ -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.utils.TableTestBase import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy} @@ -889,7 +889,7 @@ class RankTest extends TableTestBase { @Test def testUpdatableRankAfterIntermediateScan(): Unit = { util.tableEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) util.tableEnv.executeSql(""" |CREATE VIEW v1 AS diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala index 13c98427dca..54269c54454 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala @@ -51,7 +51,7 @@ class DistinctAggregateTest( Boolean.box(splitDistinctAggEnabled)) // disable incremental agg util.tableEnv.getConfig - .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(false)) + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(false)) } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala index 8293725de25..70f0170ab34 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala @@ -17,8 +17,7 @@ */ package org.apache.flink.table.planner.plan.stream.sql.agg -import org.apache.flink.table.api.config.AggregatePhaseStrategy -import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule +import org.apache.flink.table.api.config.{AggregatePhaseStrategy, OptimizerConfigOptions} import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.junit.jupiter.api.BeforeEach @@ -37,7 +36,7 @@ class IncrementalAggregateTest( super.before() // enable incremental agg util.tableEnv.getConfig - .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(true)) + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(true)) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala index bae6b751ec2..0230587401f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala @@ -17,8 +17,8 @@ */ package org.apache.flink.table.planner.runtime.batch.sql +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory -import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.{createTempFile, createTempFolder} import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row @@ -389,7 +389,7 @@ class TableSourceITCase extends BatchTestBase { @Test def testTableHintWithLogicalTableScanReuse(): Unit = { tEnv.getConfig.set( - RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) val resultPath = createTempFolder().getAbsolutePath tEnv.executeSql(s"""