This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93d49ff6eb9f61cb2450d0b25732f4d8923b840d
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Jun 4 17:32:15 2024 +0800

    [FLINK-35473][table-api][table-planner] Move adaptive local hash agg option 
to ExecutionConfigOptions
    
    This closes #24889
---
 .../generated/execution_config_configuration.html  | 18 ++++++++
 .../table/api/config/ExecutionConfigOptions.java   | 48 ++++++++++++++++++++++
 .../codegen/agg/batch/HashAggCodeGenerator.scala   | 40 ++++++++++++++++--
 .../fusion/spec/HashAggFusionCodegenSpec.scala     |  2 +-
 .../batch/sql/OperatorFusionCodegenITCase.scala    |  2 +-
 .../runtime/batch/sql/agg/HashAggITCase.scala      |  5 +--
 6 files changed, 106 insertions(+), 9 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 1ce188e0729..078b434fade 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -88,6 +88,24 @@ By default no operator is disabled.</td>
             <td><p>Enum</p></td>
             <td>Determines whether CAST will operate following the legacy 
behaviour or the new one that introduces various fixes and improvements.<br 
/><br />Possible values:<ul><li>"ENABLED": CAST will operate following the 
legacy behaviour.</li><li>"DISABLED": CAST will operate following the new 
correct behaviour.</li></ul></td>
         </tr>
+        <tr>
+            
<td><h5>table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold</h5><br>
 <span class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">0.5</td>
+            <td>Double</td>
+            <td>The distinct value rate can be defined as the number of local 
aggregation results for the sampled data divided by the sampling threshold (see 
table.exec.local-hash-agg.adaptive.sampling-threshold). If the computed result 
is lower than the given configuration value, the remaining input records 
proceed to do local aggregation, otherwise the remaining input records are 
subjected to simple projection which calculation cost is less than local 
aggregation. The default value is  [...]
+        </tr>
+        <tr>
+            <td><h5>table.exec.local-hash-agg.adaptive.enabled</h5><br> <span 
class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable adaptive local hash aggregation. Adaptive 
local hash aggregation is an optimization of local hash aggregation, which can 
adaptively determine whether to continue to do local hash aggregation according 
to the distinct value rate of sampling data. If distinct value rate bigger than 
defined threshold (see parameter: 
table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold), we will stop 
aggregating and just send the input data to the downstream afte [...]
+        </tr>
+        <tr>
+            
<td><h5>table.exec.local-hash-agg.adaptive.sampling-threshold</h5><br> <span 
class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">500000</td>
+            <td>Long</td>
+            <td>If adaptive local hash aggregation is enabled, this value 
defines how many records will be used as sampled data to calculate distinct 
value rate (see parameter: 
table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold) for the local 
aggregate. The higher the sampling threshold, the more accurate the distinct 
value rate is. But as the sampling threshold increases, local aggregation is 
meaningless when the distinct values rate is low. The default value is 
500000.</td>
+        </tr>
         <tr>
             <td><h5>table.exec.mini-batch.allow-latency</h5><br> <span 
class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">0 ms</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index e211f9c4607..f6537579ab8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -315,6 +315,54 @@ public class ExecutionConfigOptions {
                     .withDescription(
                             "Sets the window elements buffer size limit used 
in group window agg operator.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED =
+            ConfigOptions.key("table.exec.local-hash-agg.adaptive.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable adaptive local hash 
aggregation. Adaptive local hash "
+                                    + "aggregation is an optimization of local 
hash aggregation, which can adaptively "
+                                    + "determine whether to continue to do 
local hash aggregation according to the distinct "
+                                    + "value rate of sampling data. If 
distinct value rate bigger than defined threshold "
+                                    + "(see parameter: 
table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold), "
+                                    + "we will stop aggregating and just send 
the input data to the downstream after a simple "
+                                    + "projection. Otherwise, we will continue 
to do aggregation. Adaptive local hash aggregation "
+                                    + "only works in batch mode. Default value 
of this parameter is true.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Long> 
TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD =
+            
ConfigOptions.key("table.exec.local-hash-agg.adaptive.sampling-threshold")
+                    .longType()
+                    .defaultValue(500000L)
+                    .withDescription(
+                            "If adaptive local hash aggregation is enabled, 
this value defines how "
+                                    + "many records will be used as sampled 
data to calculate distinct value rate "
+                                    + "(see parameter: 
table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold) "
+                                    + "for the local aggregate. The higher the 
sampling threshold, the more accurate "
+                                    + "the distinct value rate is. But as the 
sampling threshold increases, local "
+                                    + "aggregation is meaningless when the 
distinct values rate is low. "
+                                    + "The default value is 500000.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Double>
+            TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD =
+                    ConfigOptions.key(
+                                    
"table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold")
+                            .doubleType()
+                            .defaultValue(0.5d)
+                            .withDescription(
+                                    "The distinct value rate can be defined as 
the number of local "
+                                            + "aggregation results for the 
sampled data divided by the sampling "
+                                            + "threshold (see "
+                                            + 
TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD
+                                                    .key()
+                                            + "). "
+                                            + "If the computed result is lower 
than the given configuration value, "
+                                            + "the remaining input records 
proceed to do local aggregation, otherwise "
+                                            + "the remaining input records are 
subjected to simple projection which "
+                                            + "calculation cost is less than 
local aggregation. The default value is 0.5.");
+
     // ------------------------------------------------------------------------
     //  Async Lookup Options
     // ------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
index 1a8368c9b89..7ebf2df3286 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.utils.JoinedRowData
@@ -45,7 +46,15 @@ import org.apache.calcite.tools.RelBuilder
  */
 object HashAggCodeGenerator {
 
-  // It is a experimental config, will may be removed later.
+  /**
+   * Whether to enable adaptive local hash aggregation.
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED: ConfigOption[JBoolean] =
     key("table.exec.local-hash-agg.adaptive.enabled")
@@ -63,6 +72,16 @@ object HashAggCodeGenerator {
            |only works in batch mode. Default value of this parameter is true.
            |""".stripMargin)
 
+  /**
+   * The value to define how many records as sample data when adaptive local 
hash aggregation is
+   * enabled.
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD: 
ConfigOption[JLong] =
     key("table.exec.local-hash-agg.adaptive.sampling-threshold")
@@ -79,6 +98,17 @@ object HashAggCodeGenerator {
            |The default value is 500000.
            |""".stripMargin)
 
+  /**
+   * The value to define the ratio between local aggregation result number and 
sampling threshold
+   * when adaptive local hash aggregation is enabled.
+   *
+   * @deprecated
+   *   This configuration has been deprecated as part of FLIP-457 and will be 
removed in Flink 2.0.
+   *   Please use
+   *   
[[org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD]]
+   *   instead.
+   */
+  @Deprecated
   @Experimental
   val TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD: 
ConfigOption[JDouble] =
     key("table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold")
@@ -276,7 +306,7 @@ object HashAggCodeGenerator {
       // from these conditions we know that it must be a distinct operation
       if (
         !isFinal &&
-        ctx.tableConfig.get(TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED) &&
+        
ctx.tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED)
 &&
         supportAdaptiveLocalHashAgg
       ) {
         val adaptiveDistinctCountTerm = CodeGenUtils.newName(ctx, 
"distinctCount")
@@ -285,9 +315,11 @@ object HashAggCodeGenerator {
         ctx.addReusableMember(s"private transient long $adaptiveTotalCountTerm 
= 0;")
 
         val samplingThreshold =
-          
ctx.tableConfig.get(TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD)
+          ctx.tableConfig.get(
+            
ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD)
         val distinctValueRateThreshold =
-          
ctx.tableConfig.get(TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD)
+          ctx.tableConfig.get(
+            
ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD)
 
         (
           s"$adaptiveDistinctCountTerm++;",
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
index 52b4602cd8f..109ea1e6191 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.fusion.spec
 
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD,
 TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED, 
TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.utils.JoinedRowData
@@ -26,7 +27,6 @@ import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{getReuseRowFieldExpr
 import 
org.apache.flink.table.planner.codegen.ProjectionCodeGenerator.genAdaptiveLocalHashAggValueProjectionExpr
 import org.apache.flink.table.planner.codegen.agg.batch.{AggCodeGenHelper, 
HashAggCodeGenHelper}
 import 
org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping,
 genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, 
genGetValueFromFlatAggregateBuffer, genInitFlatAggregateBuffer}
-import 
org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator.{TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_DISTINCT_VALUE_RATE_THRESHOLD,
 TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED, 
TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD}
 import 
org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenHelper.{buildAggregateAggBuffMapping,
 genAggregate, genCreateFallbackSorter, genHashAggValueExpr, 
genRetryAppendToMap, genReusableEmptyAggBuffer, prepareFallbackSorter}
 import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, 
OpFusionContext}
 import 
org.apache.flink.table.planner.plan.fusion.FusionCodegenUtil.{constructDoConsumeCode,
 constructDoConsumeFunction, evaluateVariables}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
index cbe21854c49..dfd3cf53981 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
@@ -339,7 +339,7 @@ class OperatorFusionCodegenITCase extends BatchTestBase {
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.TWO_PHASE)
     tEnv.getConfig.set(
-      
HashAggCodeGenerator.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD,
+      
ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD,
       Long.box(1L))
     checkOpFusionCodegenResult(
       """
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
index 7b951041e1c..e1424f7ccf5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.batch.sql.agg
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.config.{AggregatePhaseStrategy, 
ExecutionConfigOptions, OptimizerConfigOptions}
-import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.{data1, 
nullablesOfData1, type1}
@@ -55,10 +54,10 @@ class HashAggITCase extends 
AggregateITCaseBase("HashAggregate") {
           OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
           AggregatePhaseStrategy.TWO_PHASE)
       tEnv.getConfig.set(
-        HashAggCodeGenerator.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED,
+        ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_ENABLED,
         Boolean.box(true))
       tEnv.getConfig.set(
-        
HashAggCodeGenerator.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD,
+        
ExecutionConfigOptions.TABLE_EXEC_LOCAL_HASH_AGG_ADAPTIVE_SAMPLING_THRESHOLD,
         Long.box(3L))
     }
   }

Reply via email to