This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d43b47c99 [FLINK-30989][table] Fix some options don't take effect in 
batch mode
b4d43b47c99 is described below

commit b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4
Author: fengli <ldliu...@163.com>
AuthorDate: Sun Feb 26 15:59:33 2023 +0800

    [FLINK-30989][table] Fix some options don't take effect in batch mode
    
    This closes #22024
---
 .../nodes/exec/batch/BatchExecHashAggregate.java   |  9 +++-
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   | 10 ++++
 .../plan/nodes/exec/batch/BatchExecSort.java       | 10 +++-
 .../plan/utils/SorMergeJoinOperatorUtil.java       | 16 ++++++
 .../planner/codegen/LongHashJoinGenerator.scala    |  5 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   | 17 ++++--
 .../codegen/agg/batch/HashAggCodeGenerator.scala   | 11 ++--
 .../codegen/LongAdaptiveHashJoinGeneratorTest.java |  6 +++
 .../agg/batch/HashAggCodeGeneratorTest.scala       |  7 ++-
 .../runtime/hashtable/BaseHybridHashTable.java     | 23 +++-----
 .../table/runtime/hashtable/BinaryHashTable.java   | 11 ++--
 .../table/runtime/hashtable/LongHashPartition.java |  4 +-
 .../runtime/hashtable/LongHybridHashTable.java     | 13 ++---
 .../runtime/operators/join/HashJoinOperator.java   | 13 ++++-
 .../operators/join/SortMergeJoinFunction.java      | 23 ++++++--
 .../sort/AbstractBinaryExternalMerger.java         | 10 ++--
 .../operators/sort/BinaryExternalMerger.java       |  4 +-
 .../operators/sort/BinaryExternalSorter.java       | 46 ++++++++--------
 .../operators/sort/BinaryInMemorySortBuffer.java   |  4 +-
 .../operators/sort/BinaryKVExternalMerger.java     |  4 +-
 .../operators/sort/BinaryKVInMemorySortBuffer.java |  3 +-
 .../operators/sort/BufferedKVExternalSorter.java   | 28 ++++------
 .../table/runtime/operators/sort/SortOperator.java | 21 +++++++-
 .../flink/table/runtime/util/FileChannelUtil.java  |  8 +--
 .../runtime/hashtable/BinaryHashTableTest.java     | 25 ++++-----
 .../table/runtime/hashtable/LongHashTableTest.java | 10 ++--
 .../runtime/operators/aggregate/HashAggTest.java   |  6 ---
 .../aggregate/SumHashAggTestOperator.java          | 16 +++---
 .../join/Int2HashJoinOperatorTestBase.java         | 23 ++++++++
 .../join/Int2SortMergeJoinOperatorTest.java        | 16 ++++++
 .../join/String2HashJoinOperatorTest.java          | 19 +++++++
 .../join/String2SortMergeJoinOperatorTest.java     | 16 ++++++
 .../operators/sort/BinaryExternalSorterTest.java   | 63 +++++++++++++++++++---
 .../sort/BufferedKVExternalSorterTest.java         |  8 ++-
 34 files changed, 367 insertions(+), 141 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
index 27d672706ed..49ea262626a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
@@ -139,7 +139,14 @@ public class BatchExecHashAggregate extends 
ExecNodeBase<RowData>
                             auxGrouping,
                             isMerge,
                             isFinal,
-                            supportAdaptiveLocalHashAgg);
+                            supportAdaptiveLocalHashAgg,
+                            
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                            
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                            (int)
+                                    config.get(
+                                                    ExecutionConfigOptions
+                                                            
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                            .getBytes());
         }
 
         return ExecNodeUtil.createOneInputTransformation(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
index d390292a901..ef178487eff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
@@ -213,6 +213,12 @@ public class BatchExecHashJoin extends 
ExecNodeBase<RowData>
                         condFunc,
                         1.0 * externalBufferMemory / managedMemory);
 
+        boolean compressionEnabled =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+        int compressionBlockSize =
+                (int)
+                        
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                .getBytes();
         if (LongHashJoinGenerator.support(hashJoinType, keyType, 
joinSpec.getFilterNulls())) {
             operator =
                     LongHashJoinGenerator.gen(
@@ -229,6 +235,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
                             reverseJoin,
                             condFunc,
                             leftIsBuild,
+                            compressionEnabled,
+                            compressionBlockSize,
                             sortMergeJoinFunction);
         } else {
             operator =
@@ -236,6 +244,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
                             HashJoinOperator.newHashJoinOperator(
                                     hashJoinType,
                                     leftIsBuild,
+                                    compressionEnabled,
+                                    compressionBlockSize,
                                     condFunc,
                                     reverseJoin,
                                     joinSpec.getFilterNulls(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
index a808c86d306..929d6ed60ba 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
@@ -81,7 +81,15 @@ public class BatchExecSort extends ExecNodeBase<RowData>
         SortOperator operator =
                 new SortOperator(
                         
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
-                        
codeGen.generateRecordComparator("BatchExecSortComparator"));
+                        
codeGen.generateRecordComparator("BatchExecSortComparator"),
+                        
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                config.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
         long sortMemory =
                 
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
         return ExecNodeUtil.createOneInputTransformation(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
index 634ceb345dd..891aa185786 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
@@ -48,6 +49,17 @@ public class SorMergeJoinOperatorUtil {
             double externalBufferMemRatio) {
         int[] keyPositions = IntStream.range(0, leftKeys.length).toArray();
 
+        int maxNumFileHandles =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
+        boolean compressionEnabled =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+        int compressionBlockSize =
+                (int)
+                        
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                .getBytes();
+        boolean asyncMergeEnabled =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED);
+
         SortCodeGenerator leftSortGen =
                 SortUtil.newSortGen(config, classLoader, leftKeys, leftType);
         SortCodeGenerator rightSortGen =
@@ -57,6 +69,10 @@ public class SorMergeJoinOperatorUtil {
                 externalBufferMemRatio,
                 joinType,
                 leftIsSmaller,
+                maxNumFileHandles,
+                compressionEnabled,
+                compressionBlockSize,
+                asyncMergeEnabled,
                 condFunc,
                 ProjectionCodeGenerator.generateProjection(
                         new CodeGeneratorContext(config, classLoader),
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 9706a3aee55..4949b698206 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -111,6 +111,8 @@ object LongHashJoinGenerator {
       reverseJoinFunction: Boolean,
       condFunc: GeneratedJoinCondition,
       leftIsBuild: Boolean,
+      compressionEnabled: Boolean,
+      compressionBlockSize: Int,
       sortMergeJoinFunction: SortMergeJoinFunction): 
CodeGenOperatorFactory[RowData] = {
 
     val buildSer = new BinaryRowDataSerializer(buildType.getFieldCount)
@@ -185,7 +187,8 @@ object LongHashJoinGenerator {
          |public class $tableTerm extends 
${classOf[LongHybridHashTable].getCanonicalName} {
          |
          |  public $tableTerm() {
-         |    super(getContainingTask().getJobConfiguration(), 
getContainingTask(),
+         |    super(getContainingTask(),
+         |      $compressionEnabled, $compressionBlockSize,
          |      $buildSerTerm, $probeSerTerm,
          |      getContainingTask().getEnvironment().getMemoryManager(),
          |      computeMemorySize(),
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index ac9febcfe6e..55b0e58366d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -585,7 +585,10 @@ object HashAggCodeGenHelper {
       outputType: RowType,
       outputResultFromMap: String,
       sorterTerm: String,
-      retryAppend: String): (String, String) = {
+      retryAppend: String,
+      maxNumFileHandles: Int,
+      compressionEnabled: Boolean,
+      compressionBlockSize: Int): (String, String) = {
     val (grouping, auxGrouping) = groupingAndAuxGrouping
     if (isFinal) {
       val logMapSpilling =
@@ -603,7 +606,10 @@ object HashAggCodeGenHelper {
         groupKeyRowType,
         groupKeyTypesTerm,
         aggBufferTypesTerm,
-        sorterTerm)
+        sorterTerm,
+        maxNumFileHandles,
+        compressionEnabled,
+        compressionBlockSize)
       val fallbackToSortAggCode = genFallbackToSortAgg(
         ctx,
         builder,
@@ -711,7 +717,10 @@ object HashAggCodeGenHelper {
       groupKeyRowType: RowType,
       groupKeyTypesTerm: String,
       aggBufferTypesTerm: String,
-      sorterTerm: String): String = {
+      sorterTerm: String,
+      maxNumFileHandles: Int,
+      compressionEnabled: Boolean,
+      compressionBlockSize: Int): String = {
     val keyComputerTerm = CodeGenUtils.newName("keyComputer")
     val recordComparatorTerm = CodeGenUtils.newName("recordComparator")
     val prepareSorterCode =
@@ -727,7 +736,7 @@ object HashAggCodeGenHelper {
        |    new $binaryRowSerializerTypeTerm($aggBufferTypesTerm.length),
        |    $keyComputerTerm, $recordComparatorTerm,
        |    
getContainingTask().getEnvironment().getMemoryManager().getPageSize(),
-       |    getContainingTask().getJobConfiguration()
+       |    $maxNumFileHandles, $compressionEnabled, $compressionBlockSize
        |  );
        """.stripMargin
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
index d13fd831522..e9ccb27aef0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
@@ -105,8 +105,10 @@ object HashAggCodeGenerator {
       auxGrouping: Array[Int],
       isMerge: Boolean,
       isFinal: Boolean,
-      supportAdaptiveLocalHashAgg: Boolean)
-      : GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
+      supportAdaptiveLocalHashAgg: Boolean,
+      maxNumFileHandles: Int,
+      compressionEnabled: Boolean,
+      compressionBlockSize: Int): 
GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
 
     val aggInfos = aggInfoList.aggInfos
     val functionIdentifiers = AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
@@ -238,7 +240,10 @@ object HashAggCodeGenerator {
       outputType,
       outputResultFromMap,
       sorterTerm,
-      retryAppend
+      retryAppend,
+      maxNumFileHandles,
+      compressionEnabled,
+      compressionBlockSize
     )
 
     HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) 
sorterTerm else null)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
index b591cca3b43..85516450220 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.codegen;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
@@ -128,6 +129,11 @@ public class LongAdaptiveHashJoinGeneratorTest extends 
Int2AdaptiveHashJoinOpera
                 reverseJoinFunction,
                 condFunc,
                 buildLeft,
+                
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(),
+                (int)
+                        
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                .defaultValue()
+                                .getBytes(),
                 sortMergeJoinFunction);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 78e50368396..6b8df49ffda 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.codegen.agg.batch
 
 import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.RowData
 import 
org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.LongAvgAggFunction
 import org.apache.flink.table.planner.plan.utils.{AggregateInfo, 
AggregateInfoList}
@@ -134,7 +135,11 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
       auxGrouping,
       isMerge,
       isFinal,
-      false)
+      false,
+      
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(),
+      ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue,
+      
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue.getBytes.toInt
+    )
     (new CodeGenOperatorFactory[RowData](genOp), iType, oType)
   }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
index 72dcc28267d..cd71bd9142b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.hashtable;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
@@ -27,7 +26,6 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import 
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
 import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
@@ -127,7 +125,7 @@ public abstract class BaseHybridHashTable implements 
MemorySegmentPool {
      */
     protected FileIOChannel.Enumerator currentEnumerator;
 
-    protected final boolean compressionEnable;
+    protected final boolean compressionEnabled;
     protected final BlockCompressionFactory compressionCodecFactory;
     protected final int compressionBlockSize;
 
@@ -135,27 +133,22 @@ public abstract class BaseHybridHashTable implements 
MemorySegmentPool {
     protected transient long spillInBytes;
 
     public BaseHybridHashTable(
-            Configuration conf,
             Object owner,
+            boolean compressionEnabled,
+            int compressionBlockSize,
             MemoryManager memManager,
             long reservedMemorySize,
             IOManager ioManager,
             int avgRecordLen,
             long buildRowCount,
             boolean tryDistinctBuildRow) {
-
-        // TODO: read compression config from configuration
-        this.compressionEnable =
-                
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+        this.compressionEnabled = compressionEnabled;
         this.compressionCodecFactory =
-                this.compressionEnable
+                this.compressionEnabled
                         ? 
BlockCompressionFactory.createBlockCompressionFactory(
                                 
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
                         : null;
-        this.compressionBlockSize =
-                (int)
-                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
-                                .getBytes();
+        this.compressionBlockSize = compressionBlockSize;
         this.avgRecordLen = avgRecordLen;
         this.buildRowCount = buildRowCount;
         this.tryDistinctBuildRow = tryDistinctBuildRow;
@@ -496,7 +489,7 @@ public abstract class BaseHybridHashTable implements 
MemorySegmentPool {
                         ioManager,
                         id,
                         retSegments,
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize,
                         segmentSize);
@@ -517,7 +510,7 @@ public abstract class BaseHybridHashTable implements 
MemorySegmentPool {
                         ioManager,
                         id,
                         new LinkedBlockingQueue<>(),
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize,
                         segmentSize);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
index 26cfb0bcd9a..b21a00f803b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.runtime.hashtable;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
@@ -143,8 +142,9 @@ public class BinaryHashTable extends BaseHybridHashTable {
     BinaryRowData reuseBuildRow;
 
     public BinaryHashTable(
-            Configuration conf,
             Object owner,
+            boolean compressionEnabled,
+            int compressionBlockSize,
             AbstractRowDataSerializer buildSideSerializer,
             AbstractRowDataSerializer probeSideSerializer,
             Projection<RowData, BinaryRowData> buildSideProjection,
@@ -161,8 +161,9 @@ public class BinaryHashTable extends BaseHybridHashTable {
             boolean[] filterNulls,
             boolean tryDistinctBuildRow) {
         super(
-                conf,
                 owner,
+                compressionEnabled,
+                compressionBlockSize,
                 memManager,
                 reservedMemorySize,
                 ioManager,
@@ -447,7 +448,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
                         ioManager,
                         channelWithMeta,
                         new ArrayList<>(),
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize,
                         segmentSize);
@@ -614,7 +615,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
                             getNotNullNextBuffer(),
                             this,
                             this.segmentSize,
-                            compressionEnable,
+                            compressionEnabled,
                             compressionCodecFactory,
                             compressionBlockSize);
             area.setPartition(p);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index 306420272bf..a8d9ad9b02a 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -461,7 +461,7 @@ public class LongHashPartition extends 
AbstractPagedInputView implements Seekabl
                         ioAccess,
                         targetChannel,
                         bufferReturnQueue,
-                        longTable.compressionEnable(),
+                        longTable.compressionEnabled(),
                         longTable.compressionCodecFactory(),
                         longTable.compressionBlockSize(),
                         segmentSize);
@@ -487,7 +487,7 @@ public class LongHashPartition extends 
AbstractPagedInputView implements Seekabl
                     FileChannelUtil.createOutputView(
                             ioAccess,
                             probeChannelEnumerator.next(),
-                            longTable.compressionEnable(),
+                            longTable.compressionEnabled(),
                             longTable.compressionCodecFactory(),
                             longTable.compressionBlockSize(),
                             segmentSize);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
index 75081ea81b7..8ebbb585e47 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.hashtable;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
@@ -70,8 +69,9 @@ public abstract class LongHybridHashTable extends 
BaseHybridHashTable {
     private LongHashPartition densePartition;
 
     public LongHybridHashTable(
-            Configuration conf,
             Object owner,
+            boolean compressionEnabled,
+            int compressionBlockSize,
             BinaryRowDataSerializer buildSideSerializer,
             BinaryRowDataSerializer probeSideSerializer,
             MemoryManager memManager,
@@ -80,8 +80,9 @@ public abstract class LongHybridHashTable extends 
BaseHybridHashTable {
             int avgRecordLen,
             long buildRowCount) {
         super(
-                conf,
                 owner,
+                compressionEnabled,
+                compressionBlockSize,
                 memManager,
                 reservedMemorySize,
                 ioManager,
@@ -412,7 +413,7 @@ public abstract class LongHybridHashTable extends 
BaseHybridHashTable {
                         ioManager,
                         channelWithMeta,
                         new ArrayList<>(),
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize,
                         segmentSize);
@@ -658,8 +659,8 @@ public abstract class LongHybridHashTable extends 
BaseHybridHashTable {
         this.partitionsPendingForSMJ.clear();
     }
 
-    public boolean compressionEnable() {
-        return compressionEnable;
+    public boolean compressionEnabled() {
+        return compressionEnabled;
     }
 
     public BlockCompressionFactory compressionCodecFactory() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
index 65183a07f03..abdaa22c386 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
@@ -124,8 +124,9 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
 
         this.table =
                 new BinaryHashTable(
-                        getContainingTask().getJobConfiguration(),
                         getContainingTask(),
+                        parameter.compressionEnabled,
+                        parameter.compressionBlockSize,
                         buildSerializer,
                         probeSerializer,
                         parameter.buildProjectionCode.newInstance(cl),
@@ -322,6 +323,8 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
     public static HashJoinOperator newHashJoinOperator(
             HashJoinType type,
             boolean leftIsBuild,
+            boolean compressionEnable,
+            int compressionBlockSize,
             GeneratedJoinCondition condFuncCode,
             boolean reverseJoinFunction,
             boolean[] filterNullKeys,
@@ -337,6 +340,8 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
                 new HashJoinParameter(
                         type,
                         leftIsBuild,
+                        compressionEnable,
+                        compressionBlockSize,
                         condFuncCode,
                         reverseJoinFunction,
                         filterNullKeys,
@@ -372,6 +377,8 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
     static class HashJoinParameter implements Serializable {
         HashJoinType type;
         boolean leftIsBuild;
+        boolean compressionEnabled;
+        int compressionBlockSize;
         GeneratedJoinCondition condFuncCode;
         boolean reverseJoinFunction;
         boolean[] filterNullKeys;
@@ -387,6 +394,8 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
         HashJoinParameter(
                 HashJoinType type,
                 boolean leftIsBuild,
+                boolean compressionEnabled,
+                int compressionBlockSize,
                 GeneratedJoinCondition condFuncCode,
                 boolean reverseJoinFunction,
                 boolean[] filterNullKeys,
@@ -400,6 +409,8 @@ public abstract class HashJoinOperator extends 
TableStreamOperator<RowData>
                 SortMergeJoinFunction sortMergeJoinFunction) {
             this.type = type;
             this.leftIsBuild = leftIsBuild;
+            this.compressionEnabled = compressionEnabled;
+            this.compressionBlockSize = compressionBlockSize;
             this.condFuncCode = condFuncCode;
             this.reverseJoinFunction = reverseJoinFunction;
             this.filterNullKeys = filterNullKeys;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
index 616f9ebb71c..aa5bed09320 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
@@ -59,6 +59,10 @@ public class SortMergeJoinFunction implements Serializable {
     private final FlinkJoinType type;
     private final boolean leftIsSmaller;
     private final boolean[] filterNulls;
+    private final int maxNumFileHandles;
+    private final boolean compressionEnabled;
+    private final int compressionBlockSize;
+    private final boolean asyncMergeEnabled;
 
     // generated code to cook
     private GeneratedJoinCondition condFuncCode;
@@ -93,6 +97,10 @@ public class SortMergeJoinFunction implements Serializable {
             double externalBufferMemRatio,
             FlinkJoinType type,
             boolean leftIsSmaller,
+            int maxNumFileHandles,
+            boolean compressionEnabled,
+            int compressionBlockSize,
+            boolean asyncMergeEnabled,
             GeneratedJoinCondition condFuncCode,
             GeneratedProjection projectionCode1,
             GeneratedProjection projectionCode2,
@@ -105,6 +113,10 @@ public class SortMergeJoinFunction implements Serializable 
{
         this.externalBufferMemRatio = externalBufferMemRatio;
         this.type = type;
         this.leftIsSmaller = leftIsSmaller;
+        this.maxNumFileHandles = maxNumFileHandles;
+        this.compressionEnabled = compressionEnabled;
+        this.compressionBlockSize = compressionBlockSize;
+        this.asyncMergeEnabled = asyncMergeEnabled;
         this.condFuncCode = condFuncCode;
         this.projectionCode1 = projectionCode1;
         this.projectionCode2 = projectionCode2;
@@ -159,7 +171,6 @@ public class SortMergeJoinFunction implements Serializable {
                             + ", please increase manage memory of task 
manager.");
         }
 
-        Configuration conf = taskContainer.getJobConfiguration();
         // sorter1
         this.sorter1 =
                 new BinaryExternalSorter(
@@ -171,7 +182,10 @@ public class SortMergeJoinFunction implements Serializable 
{
                         serializer1,
                         computer1.newInstance(cl),
                         comparator1.newInstance(cl),
-                        conf);
+                        maxNumFileHandles,
+                        compressionEnabled,
+                        compressionBlockSize,
+                        asyncMergeEnabled);
         this.sorter1.startThreads();
 
         // sorter2
@@ -185,7 +199,10 @@ public class SortMergeJoinFunction implements Serializable 
{
                         serializer2,
                         computer2.newInstance(cl),
                         comparator2.newInstance(cl),
-                        conf);
+                        maxNumFileHandles,
+                        compressionEnabled,
+                        compressionBlockSize,
+                        asyncMergeEnabled);
         this.sorter2.startThreads();
 
         keyComparator = genKeyComparator.newInstance(cl);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
index 33faa8bf72b..22185c278b5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
@@ -51,7 +51,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
 
     private final int maxFanIn;
     private final SpillChannelManager channelManager;
-    private final boolean compressionEnable;
+    private final boolean compressionEnabled;
     private final BlockCompressionFactory compressionCodecFactory;
     private final int compressionBlockSize;
 
@@ -63,14 +63,14 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
             int pageSize,
             int maxFanIn,
             SpillChannelManager channelManager,
-            boolean compressionEnable,
+            boolean compressionEnabled,
             BlockCompressionFactory compressionCodecFactory,
             int compressionBlockSize) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.maxFanIn = maxFanIn;
         this.channelManager = channelManager;
-        this.compressionEnable = compressionEnable;
+        this.compressionEnabled = compressionEnabled;
         this.compressionCodecFactory = compressionCodecFactory;
         this.compressionBlockSize = compressionBlockSize;
     }
@@ -102,7 +102,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
                             ioManager,
                             channel,
                             openChannels,
-                            compressionEnable,
+                            compressionEnabled,
                             compressionCodecFactory,
                             compressionBlockSize,
                             pageSize);
@@ -185,7 +185,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
                     FileChannelUtil.createOutputView(
                             ioManager,
                             mergedChannelID,
-                            compressionEnable,
+                            compressionEnabled,
                             compressionCodecFactory,
                             compressionBlockSize,
                             pageSize);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
index b68f936cab7..74467c5c761 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
@@ -46,7 +46,7 @@ public class BinaryExternalMerger extends 
AbstractBinaryExternalMerger<BinaryRow
             SpillChannelManager channelManager,
             BinaryRowDataSerializer serializer,
             RecordComparator comparator,
-            boolean compressionEnable,
+            boolean compressionEnabled,
             BlockCompressionFactory compressionCodecFactory,
             int compressionBlockSize) {
         super(
@@ -54,7 +54,7 @@ public class BinaryExternalMerger extends 
AbstractBinaryExternalMerger<BinaryRow
                 pageSize,
                 maxFanIn,
                 channelManager,
-                compressionEnable,
+                compressionEnabled,
                 compressionCodecFactory,
                 compressionBlockSize);
         this.serializer = serializer;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
index 149025ebd43..09da136143a 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.operators.sort;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.AlgorithmOptions;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.runtime.operators.sort.Sorter;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
@@ -152,11 +150,11 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
 
     private final int memorySegmentSize;
 
-    private final boolean compressionEnable;
+    private final boolean compressionEnabled;
     private final BlockCompressionFactory compressionCodecFactory;
     private final int compressionBlockSize;
 
-    private final boolean asyncMergeEnable;
+    private final boolean asyncMergeEnabled;
 
     // ------------------------------------------------------------------------
     //                         Constructor & Shutdown
@@ -178,7 +176,10 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
             BinaryRowDataSerializer serializer,
             NormalizedKeyComputer normalizedKeyComputer,
             RecordComparator comparator,
-            Configuration conf) {
+            int maxNumFileHandles,
+            boolean compressionEnabled,
+            int compressionBlockSize,
+            boolean asyncMergeEnabled) {
         this(
                 owner,
                 memoryManager,
@@ -188,7 +189,10 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
                 serializer,
                 normalizedKeyComputer,
                 comparator,
-                conf,
+                maxNumFileHandles,
+                compressionEnabled,
+                compressionBlockSize,
+                asyncMergeEnabled,
                 AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue());
     }
 
@@ -201,23 +205,19 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
             BinaryRowDataSerializer serializer,
             NormalizedKeyComputer normalizedKeyComputer,
             RecordComparator comparator,
-            Configuration conf,
+            int maxNumFileHandles,
+            boolean compressionEnabled,
+            int compressionBlockSize,
+            boolean asyncMergeEnabled,
             float startSpillingFraction) {
-        int maxNumFileHandles =
-                
conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
-        this.compressionEnable =
-                
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+        this.compressionEnabled = compressionEnabled;
         this.compressionCodecFactory =
-                this.compressionEnable
+                this.compressionEnabled
                         ? 
BlockCompressionFactory.createBlockCompressionFactory(
                                 
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
                         : null;
-        this.compressionBlockSize =
-                (int)
-                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
-                                .getBytes();
-        asyncMergeEnable =
-                
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED);
+        this.compressionBlockSize = compressionBlockSize;
+        this.asyncMergeEnabled = asyncMergeEnabled;
 
         checkArgument(maxNumFileHandles >= 2);
         checkNotNull(ioManager);
@@ -256,8 +256,8 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
                         + "maxNumFileHandles({}), compressionEnable({}), 
compressionCodecFactory({}), compressionBlockSize({}).",
                 sortMemPages,
                 maxNumFileHandles,
-                compressionEnable,
-                compressionEnable ? compressionCodecFactory.getClass() : null,
+                compressionEnabled,
+                compressionEnabled ? compressionCodecFactory.getClass() : null,
                 compressionBlockSize);
 
         this.sortBuffers = new ArrayList<>();
@@ -314,7 +314,7 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
                         channelManager,
                         (BinaryRowDataSerializer) serializer.duplicate(),
                         comparator,
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize);
 
@@ -1010,7 +1010,7 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
                                 FileChannelUtil.createOutputView(
                                         ioManager,
                                         channel,
-                                        compressionEnable,
+                                        compressionEnabled,
                                         compressionCodecFactory,
                                         compressionBlockSize,
                                         memorySegmentSize);
@@ -1126,7 +1126,7 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRowData> {
                 spillChannelIDs.add(channelID);
                 // if async merge is disabled, we will only do the final merge
                 // otherwise we wait for `maxFanIn` number of channels to 
begin a merge
-                if (!asyncMergeEnable || spillChannelIDs.size() < maxFanIn) {
+                if (!asyncMergeEnabled || spillChannelIDs.size() < maxFanIn) {
                     continue;
                 }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
index bdd009051af..ca6f65e45fb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
@@ -40,7 +40,7 @@ public final class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable {
 
     private static final int MIN_REQUIRED_BUFFERS = 3;
 
-    private AbstractRowDataSerializer<RowData> inputSerializer;
+    private final AbstractRowDataSerializer<RowData> inputSerializer;
     private final ArrayList<MemorySegment> recordBufferSegments;
     private final SimpleCollectingOutputView recordCollector;
     private final int totalNumBuffers;
@@ -185,7 +185,7 @@ public final class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable {
      *
      * @return An iterator returning the records in their logical order.
      */
-    public final MutableObjectIterator<BinaryRowData> getIterator() {
+    public MutableObjectIterator<BinaryRowData> getIterator() {
         return new MutableObjectIterator<BinaryRowData>() {
             private final int size = size();
             private int current = 0;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
index ed87f8a5048..db98dfd77fe 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
@@ -48,7 +48,7 @@ public class BinaryKVExternalMerger
             BinaryRowDataSerializer keySerializer,
             BinaryRowDataSerializer valueSerializer,
             RecordComparator comparator,
-            boolean compressionEnable,
+            boolean compressionEnabled,
             BlockCompressionFactory compressionCodecFactory,
             int compressionBlockSize) {
         super(
@@ -56,7 +56,7 @@ public class BinaryKVExternalMerger
                 pageSize,
                 maxFanIn,
                 channelManager,
-                compressionEnable,
+                compressionEnabled,
                 compressionCodecFactory,
                 compressionBlockSize);
         this.keySerializer = keySerializer;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
index 4fdf6976d40..38f721e355f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
@@ -65,8 +65,7 @@ public class BinaryKVInMemorySortBuffer extends 
BinaryIndexedSortable {
             BinaryRowDataSerializer valueSerializer,
             RecordComparator comparator,
             ArrayList<MemorySegment> recordBufferSegments,
-            MemorySegmentPool memorySegmentPool)
-            throws IOException {
+            MemorySegmentPool memorySegmentPool) {
         super(
                 normalizedKeyComputer,
                 keySerializer,
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
index 02b6aabb8dc..11c252408fe 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
@@ -18,7 +18,6 @@
 package org.apache.flink.table.runtime.operators.sort;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
@@ -26,7 +25,6 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.operators.sort.IndexedSorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
 import org.apache.flink.table.runtime.generated.RecordComparator;
@@ -73,14 +71,14 @@ public class BufferedKVExternalSorter {
     private final List<ChannelWithMeta> channelIDs = new ArrayList<>();
     private final SpillChannelManager channelManager;
 
-    private int pageSize;
+    private final int pageSize;
 
     // metric
     private long numSpillFiles;
     private long spillInBytes;
     private long spillInCompressedBytes;
 
-    private final boolean compressionEnable;
+    private final boolean compressionEnabled;
     private final BlockCompressionFactory compressionCodecFactory;
     private final int compressionBlockSize;
 
@@ -91,27 +89,23 @@ public class BufferedKVExternalSorter {
             NormalizedKeyComputer nKeyComputer,
             RecordComparator comparator,
             int pageSize,
-            Configuration conf)
-            throws IOException {
+            int maxNumFileHandles,
+            boolean compressionEnabled,
+            int compressionBlockSize) {
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
         this.nKeyComputer = nKeyComputer;
         this.comparator = comparator;
         this.pageSize = pageSize;
         this.sorter = new QuickSort();
-        this.maxNumFileHandles =
-                
conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
-        this.compressionEnable =
-                
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+        this.maxNumFileHandles = maxNumFileHandles;
+        this.compressionEnabled = compressionEnabled;
         this.compressionCodecFactory =
-                this.compressionEnable
+                this.compressionEnabled
                         ? 
BlockCompressionFactory.createBlockCompressionFactory(
                                 
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
                         : null;
-        this.compressionBlockSize =
-                (int)
-                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
-                                .getBytes();
+        this.compressionBlockSize = compressionBlockSize;
         this.ioManager = ioManager;
         this.enumerator = this.ioManager.createChannelEnumerator();
         this.channelManager = new SpillChannelManager();
@@ -124,7 +118,7 @@ public class BufferedKVExternalSorter {
                         keySerializer,
                         valueSerializer,
                         comparator,
-                        compressionEnable,
+                        compressionEnabled,
                         compressionCodecFactory,
                         compressionBlockSize);
     }
@@ -176,7 +170,7 @@ public class BufferedKVExternalSorter {
                     FileChannelUtil.createOutputView(
                             ioManager,
                             channel,
-                            compressionEnable,
+                            compressionEnabled,
                             compressionCodecFactory,
                             compressionBlockSize,
                             pageSize);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
index c09db2cb073..927c5d97e15 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
@@ -47,14 +47,28 @@ public class SortOperator extends 
TableStreamOperator<RowData>
     private GeneratedNormalizedKeyComputer gComputer;
     private GeneratedRecordComparator gComparator;
 
+    private final int maxNumFileHandles;
+    private final boolean compressionEnabled;
+    private final int compressionBlockSize;
+    private final boolean asyncMergeEnabled;
+
     private transient BinaryExternalSorter sorter;
     private transient StreamRecordCollector<RowData> collector;
     private transient BinaryRowDataSerializer binarySerializer;
 
     public SortOperator(
-            GeneratedNormalizedKeyComputer gComputer, 
GeneratedRecordComparator gComparator) {
+            GeneratedNormalizedKeyComputer gComputer,
+            GeneratedRecordComparator gComparator,
+            int maxNumFileHandles,
+            boolean compressionEnabled,
+            int compressionBlockSize,
+            boolean asyncMergeEnabled) {
         this.gComputer = gComputer;
         this.gComparator = gComparator;
+        this.maxNumFileHandles = maxNumFileHandles;
+        this.compressionEnabled = compressionEnabled;
+        this.compressionBlockSize = compressionBlockSize;
+        this.asyncMergeEnabled = asyncMergeEnabled;
     }
 
     @Override
@@ -85,7 +99,10 @@ public class SortOperator extends 
TableStreamOperator<RowData>
                         binarySerializer,
                         computer,
                         comparator,
-                        getContainingTask().getJobConfiguration());
+                        maxNumFileHandles,
+                        compressionEnabled,
+                        compressionBlockSize,
+                        asyncMergeEnabled);
         this.sorter.startThreads();
 
         collector = new StreamRecordCollector<>(output);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
index 9d8ee6d07b9..a8e0da893be 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
@@ -49,12 +49,12 @@ public class FileChannelUtil {
             IOManager ioManager,
             ChannelWithMeta channel,
             List<FileIOChannel> channels,
-            boolean compressionEnable,
+            boolean compressionEnabled,
             BlockCompressionFactory compressionCodecFactory,
             int compressionBlockSize,
             int segmentSize)
             throws IOException {
-        if (compressionEnable) {
+        if (compressionEnabled) {
             CompressedHeaderlessChannelReaderInputView in =
                     new CompressedHeaderlessChannelReaderInputView(
                             channel.getChannel(),
@@ -82,12 +82,12 @@ public class FileChannelUtil {
     public static AbstractChannelWriterOutputView createOutputView(
             IOManager ioManager,
             FileIOChannel.ID channel,
-            boolean compressionEnable,
+            boolean compressionEnabled,
             BlockCompressionFactory compressionCodecFactory,
             int compressionBlockSize,
             int segmentSize)
             throws IOException {
-        if (compressionEnable) {
+        if (compressionEnabled) {
             BufferFileWriter bufferWriter = 
ioManager.createBufferFileWriter(channel);
             return new CompressedHeaderlessChannelWriterOutputView(
                     bufferWriter, compressionCodecFactory, 
compressionBlockSize);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index b9868059497..0d4d6b2b748 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.hashtable;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -28,7 +27,6 @@ import 
org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
@@ -53,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -66,7 +65,6 @@ public class BinaryHashTableTest {
     private BinaryRowDataSerializer probeSideSerializer;
 
     private boolean useCompress;
-    private Configuration conf;
 
     public BinaryHashTableTest(boolean useCompress) {
         this.useCompress = useCompress;
@@ -84,9 +82,6 @@ public class BinaryHashTableTest {
         this.probeSideSerializer = new BinaryRowDataSerializer(types.length);
 
         this.ioManager = new IOManagerAsync();
-
-        conf = new Configuration();
-        
conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
useCompress);
     }
 
     @After
@@ -736,8 +731,9 @@ public class BinaryHashTableTest {
                 MemoryManagerBuilder.newBuilder().setMemorySize(96 * 
PAGE_SIZE).build();
         final BinaryHashTable table =
                 new BinaryHashTable(
-                        conf,
                         new Object(),
+                        useCompress,
+                        (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                         this.buildSideSerializer,
                         this.probeSideSerializer,
                         new MyProjection(),
@@ -836,8 +832,9 @@ public class BinaryHashTableTest {
 
         final BinaryHashTable table =
                 new BinaryHashTable(
-                        conf,
                         new Object(),
+                        useCompress,
+                        (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                         this.buildSideSerializer,
                         this.probeSideSerializer,
                         new MyProjection(),
@@ -901,8 +898,9 @@ public class BinaryHashTableTest {
         // allocate the memory for the HashTable
         final BinaryHashTable table =
                 new BinaryHashTable(
-                        conf,
                         new Object(),
+                        useCompress,
+                        (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                         this.buildSideSerializer,
                         this.probeSideSerializer,
                         new MyProjection(),
@@ -1006,8 +1004,9 @@ public class BinaryHashTableTest {
 
         final BinaryHashTable table =
                 new BinaryHashTable(
-                        conf,
                         new Object(),
+                        useCompress,
+                        (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                         buildSideSerializer,
                         probeSideSerializer,
                         new MyProjection(),
@@ -1071,8 +1070,9 @@ public class BinaryHashTableTest {
 
         final BinaryHashTable table =
                 new BinaryHashTable(
-                        conf,
                         new Object(),
+                        useCompress,
+                        (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                         buildSideSerializer,
                         probeSideSerializer,
                         new MyProjection(),
@@ -1169,8 +1169,9 @@ public class BinaryHashTableTest {
             long memory,
             IOManager ioManager) {
         return new BinaryHashTable(
-                conf,
                 new Object(),
+                useCompress,
+                (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                 buildSideSerializer,
                 probeSideSerializer,
                 buildSideProjection,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
index 8ec312ee392..660dcaf06ae 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
@@ -20,14 +20,12 @@ package org.apache.flink.table.runtime.hashtable;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
@@ -48,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.fail;
 
@@ -63,7 +62,6 @@ public class LongHashTableTest {
             MemoryManagerBuilder.newBuilder().setMemorySize(896 * 
PAGE_SIZE).build();
 
     private boolean useCompress;
-    private Configuration conf;
 
     public LongHashTableTest(boolean useCompress) {
         this.useCompress = useCompress;
@@ -80,17 +78,15 @@ public class LongHashTableTest {
         this.buildSideSerializer = new BinaryRowDataSerializer(types.length);
         this.probeSideSerializer = new BinaryRowDataSerializer(types.length);
         this.ioManager = new IOManagerAsync();
-
-        conf = new Configuration();
-        
conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
useCompress);
     }
 
     private class MyHashTable extends LongHybridHashTable {
 
         public MyHashTable(long memorySize) {
             super(
-                    conf,
                     LongHashTableTest.this,
+                    useCompress,
+                    (int) 
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
                     buildSideSerializer,
                     probeSideSerializer,
                     memManager,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
index a280e28eebb..fab38dfd62a 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.operators.aggregate;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -80,11 +79,6 @@ public class HashAggTest {
                         };
                     }
 
-                    @Override
-                    Configuration getConf() {
-                        return new Configuration();
-                    }
-
                     @Override
                     public IOManager getIOManager() {
                         return ioManager;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
index 88be5ef2f0b..d10b1fdb54d 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
@@ -19,12 +19,12 @@
 package org.apache.flink.table.runtime.operators.aggregate;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -119,7 +119,15 @@ public class SumHashAggTestOperator extends 
AbstractStreamOperator<RowData>
                                     new IntNormalizedKeyComputer(),
                                     new IntRecordComparator(),
                                     getMemoryManager().getPageSize(),
-                                    getConf());
+                                    
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
+                                            .defaultValue(),
+                                    
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
+                                            .defaultValue(),
+                                    (int)
+                                            ExecutionConfigOptions
+                                                    
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                                    .defaultValue()
+                                                    .getBytes());
                 }
                 // sort and spill
                 sorter.sortAndSpill(
@@ -255,10 +263,6 @@ public class SumHashAggTestOperator extends 
AbstractStreamOperator<RowData>
         return getContainingTask().getEnvironment().getMemoryManager();
     }
 
-    Configuration getConf() {
-        return getContainingTask().getJobConfiguration();
-    }
-
     public IOManager getIOManager() {
         return getContainingTask().getEnvironment().getIOManager();
     }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
index 0988d272dfc..53a8dab724f 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
@@ -148,6 +149,18 @@ public abstract class Int2HashJoinOperatorTestBase 
implements Serializable {
                 };
         boolean[] filterNulls = new boolean[] {true};
 
+        int maxNumFileHandles =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+        boolean compressionEnabled =
+                
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+        int compressionBlockSize =
+                (int)
+                        
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                .defaultValue()
+                                .getBytes();
+        boolean asyncMergeEnabled =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
+
         SortMergeJoinFunction sortMergeJoinFunction;
         if (buildLeft) {
             sortMergeJoinFunction =
@@ -155,6 +168,10 @@ public abstract class Int2HashJoinOperatorTestBase 
implements Serializable {
                             0,
                             flinkJoinType,
                             buildLeft,
+                            maxNumFileHandles,
+                            compressionEnabled,
+                            compressionBlockSize,
+                            asyncMergeEnabled,
                             condFuncCode,
                             buildProjectionCode,
                             probeProjectionCode,
@@ -170,6 +187,10 @@ public abstract class Int2HashJoinOperatorTestBase 
implements Serializable {
                             0,
                             flinkJoinType,
                             buildLeft,
+                            maxNumFileHandles,
+                            compressionEnabled,
+                            compressionBlockSize,
+                            asyncMergeEnabled,
                             condFuncCode,
                             probeProjectionCode,
                             buildProjectionCode,
@@ -184,6 +205,8 @@ public abstract class Int2HashJoinOperatorTestBase 
implements Serializable {
         return HashJoinOperator.newHashJoinOperator(
                 hashJoinType,
                 buildLeft,
+                compressionEnabled,
+                compressionBlockSize,
                 condFuncCode,
                 reverseJoinFunction,
                 filterNulls,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
index 9deb124cb5c..73368b5470c 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
@@ -215,10 +216,25 @@ public class Int2SortMergeJoinOperatorTest {
     }
 
     public static SortMergeJoinFunction getJoinFunction(FlinkJoinType type, 
boolean leftIsSmaller) {
+        int maxNumFileHandles =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+        boolean compressionEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+        int compressionBlockSize =
+                (int)
+                        
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                .defaultValue()
+                                .getBytes();
+        boolean asyncMergeEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
         return new SortMergeJoinFunction(
                 0,
                 type,
                 leftIsSmaller,
+                maxNumFileHandles,
+                compressionEnable,
+                compressionBlockSize,
+                asyncMergeEnable,
                 new GeneratedJoinCondition("", "", new Object[0]) {
                     @Override
                     public JoinCondition newInstance(ClassLoader classLoader) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index ed6ff982eba..bb75ffd16e5 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -398,11 +399,27 @@ public class String2HashJoinOperatorTest implements 
Serializable {
                 };
         boolean[] filterNulls = new boolean[] {true};
 
+        int maxNumFileHandles =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+        boolean compressionEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+        int compressionBlockSize =
+                (int)
+                        
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                .defaultValue()
+                                .getBytes();
+        boolean asyncMergeEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
+
         SortMergeJoinFunction sortMergeJoinFunction =
                 new SortMergeJoinFunction(
                         0,
                         flinkJoinType,
                         buildLeft,
+                        maxNumFileHandles,
+                        compressionEnable,
+                        compressionBlockSize,
+                        asyncMergeEnable,
                         condFuncCode,
                         probeProjectionCode,
                         buildProjectionCode,
@@ -416,6 +433,8 @@ public class String2HashJoinOperatorTest implements 
Serializable {
         return HashJoinOperator.newHashJoinOperator(
                 hashJoinType,
                 buildLeft,
+                compressionEnable,
+                compressionBlockSize,
                 condFuncCode,
                 reverseJoinFunction,
                 filterNulls,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index cb1ea8357ca..faab30bccd4 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
@@ -185,11 +186,26 @@ public class String2SortMergeJoinOperatorTest {
     }
 
     static StreamOperator newOperator(FlinkJoinType type, boolean 
leftIsSmaller) {
+        int maxNumFileHandles =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+        boolean compressionEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+        int compressionBlockSize =
+                (int)
+                        
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+                                .defaultValue()
+                                .getBytes();
+        boolean asyncMergeEnable =
+                
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
         SortMergeJoinFunction sortMergeJoinFunction =
                 new SortMergeJoinFunction(
                         0,
                         type,
                         leftIsSmaller,
+                        maxNumFileHandles,
+                        compressionEnable,
+                        compressionBlockSize,
+                        asyncMergeEnable,
                         new GeneratedJoinCondition("", "", new Object[0]) {
                             @Override
                             public JoinCondition newInstance(ClassLoader 
classLoader) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
index 052bf244b05..f88caa5696f 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
@@ -134,7 +134,14 @@ public class BinaryExternalSorterTest {
                         serializer,
                         IntNormalizedKeyComputer.INSTANCE,
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         1f);
         sorter.startThreads();
         sorter.write(reader);
@@ -174,7 +181,14 @@ public class BinaryExternalSorterTest {
                         serializer,
                         IntNormalizedKeyComputer.INSTANCE,
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
         sorter.write(reader);
@@ -215,7 +229,14 @@ public class BinaryExternalSorterTest {
                             }
                         },
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
         sorter.write(new MockBinaryRowReader(size));
@@ -257,7 +278,14 @@ public class BinaryExternalSorterTest {
                         serializer,
                         IntNormalizedKeyComputer.INSTANCE,
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
         sorter.write(reader);
@@ -305,7 +333,14 @@ public class BinaryExternalSorterTest {
                                 return -super.compare(o1, o2);
                             }
                         },
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
         sorter.write(reader);
@@ -351,7 +386,14 @@ public class BinaryExternalSorterTest {
                         serializer,
                         IntNormalizedKeyComputer.INSTANCE,
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
         sorter.write(reader);
@@ -389,7 +431,14 @@ public class BinaryExternalSorterTest {
                         serializer,
                         IntNormalizedKeyComputer.INSTANCE,
                         IntRecordComparator.INSTANCE,
-                        conf,
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes(),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
                         0.7f);
         sorter.startThreads();
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
index 95f5bf922ab..484310610ad 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
@@ -115,7 +115,13 @@ public class BufferedKVExternalSorterTest {
                         computer,
                         comparator,
                         PAGE_SIZE,
-                        conf);
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+                        
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+                        (int)
+                                conf.get(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+                                        .getBytes());
         TestMemorySegmentPool pool = new TestMemorySegmentPool(PAGE_SIZE);
         List<Integer> expected = new ArrayList<>();
         for (int i = 0; i < spillNumber; i++) {


Reply via email to