This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b4d43b47c99 [FLINK-30989][table] Fix some options don't take effect in batch mode b4d43b47c99 is described below commit b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4 Author: fengli <ldliu...@163.com> AuthorDate: Sun Feb 26 15:59:33 2023 +0800 [FLINK-30989][table] Fix some options don't take effect in batch mode This closes #22024 --- .../nodes/exec/batch/BatchExecHashAggregate.java | 9 +++- .../plan/nodes/exec/batch/BatchExecHashJoin.java | 10 ++++ .../plan/nodes/exec/batch/BatchExecSort.java | 10 +++- .../plan/utils/SorMergeJoinOperatorUtil.java | 16 ++++++ .../planner/codegen/LongHashJoinGenerator.scala | 5 +- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 17 ++++-- .../codegen/agg/batch/HashAggCodeGenerator.scala | 11 ++-- .../codegen/LongAdaptiveHashJoinGeneratorTest.java | 6 +++ .../agg/batch/HashAggCodeGeneratorTest.scala | 7 ++- .../runtime/hashtable/BaseHybridHashTable.java | 23 +++----- .../table/runtime/hashtable/BinaryHashTable.java | 11 ++-- .../table/runtime/hashtable/LongHashPartition.java | 4 +- .../runtime/hashtable/LongHybridHashTable.java | 13 ++--- .../runtime/operators/join/HashJoinOperator.java | 13 ++++- .../operators/join/SortMergeJoinFunction.java | 23 ++++++-- .../sort/AbstractBinaryExternalMerger.java | 10 ++-- .../operators/sort/BinaryExternalMerger.java | 4 +- .../operators/sort/BinaryExternalSorter.java | 46 ++++++++-------- .../operators/sort/BinaryInMemorySortBuffer.java | 4 +- .../operators/sort/BinaryKVExternalMerger.java | 4 +- .../operators/sort/BinaryKVInMemorySortBuffer.java | 3 +- .../operators/sort/BufferedKVExternalSorter.java | 28 ++++------ .../table/runtime/operators/sort/SortOperator.java | 21 +++++++- .../flink/table/runtime/util/FileChannelUtil.java | 8 +-- .../runtime/hashtable/BinaryHashTableTest.java | 25 ++++----- .../table/runtime/hashtable/LongHashTableTest.java | 10 ++-- .../runtime/operators/aggregate/HashAggTest.java | 6 --- .../aggregate/SumHashAggTestOperator.java | 16 +++--- .../join/Int2HashJoinOperatorTestBase.java | 23 ++++++++ .../join/Int2SortMergeJoinOperatorTest.java | 16 ++++++ .../join/String2HashJoinOperatorTest.java | 19 +++++++ .../join/String2SortMergeJoinOperatorTest.java | 16 ++++++ .../operators/sort/BinaryExternalSorterTest.java | 63 +++++++++++++++++++--- .../sort/BufferedKVExternalSorterTest.java | 8 ++- 34 files changed, 367 insertions(+), 141 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java index 27d672706ed..49ea262626a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java @@ -139,7 +139,14 @@ public class BatchExecHashAggregate extends ExecNodeBase<RowData> auxGrouping, isMerge, isFinal, - supportAdaptiveLocalHashAgg); + supportAdaptiveLocalHashAgg, + config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + config.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes()); } return ExecNodeUtil.createOneInputTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java index d390292a901..ef178487eff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java @@ -213,6 +213,12 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData> condFunc, 1.0 * externalBufferMemory / managedMemory); + boolean compressionEnabled = + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED); + int compressionBlockSize = + (int) + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(); if (LongHashJoinGenerator.support(hashJoinType, keyType, joinSpec.getFilterNulls())) { operator = LongHashJoinGenerator.gen( @@ -229,6 +235,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData> reverseJoin, condFunc, leftIsBuild, + compressionEnabled, + compressionBlockSize, sortMergeJoinFunction); } else { operator = @@ -236,6 +244,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData> HashJoinOperator.newHashJoinOperator( hashJoinType, leftIsBuild, + compressionEnabled, + compressionBlockSize, condFunc, reverseJoin, joinSpec.getFilterNulls(), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java index a808c86d306..929d6ed60ba 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java @@ -81,7 +81,15 @@ public class BatchExecSort extends ExecNodeBase<RowData> SortOperator operator = new SortOperator( codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"), - codeGen.generateRecordComparator("BatchExecSortComparator")); + codeGen.generateRecordComparator("BatchExecSortComparator"), + config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + config.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); long sortMemory = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes(); return ExecNodeUtil.createOneInputTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java index 634ceb345dd..891aa185786 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.utils; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator; import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; @@ -48,6 +49,17 @@ public class SorMergeJoinOperatorUtil { double externalBufferMemRatio) { int[] keyPositions = IntStream.range(0, leftKeys.length).toArray(); + int maxNumFileHandles = + config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES); + boolean compressionEnabled = + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED); + int compressionBlockSize = + (int) + config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(); + boolean asyncMergeEnabled = + config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED); + SortCodeGenerator leftSortGen = SortUtil.newSortGen(config, classLoader, leftKeys, leftType); SortCodeGenerator rightSortGen = @@ -57,6 +69,10 @@ public class SorMergeJoinOperatorUtil { externalBufferMemRatio, joinType, leftIsSmaller, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled, condFunc, ProjectionCodeGenerator.generateProjection( new CodeGeneratorContext(config, classLoader), diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala index 9706a3aee55..4949b698206 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala @@ -111,6 +111,8 @@ object LongHashJoinGenerator { reverseJoinFunction: Boolean, condFunc: GeneratedJoinCondition, leftIsBuild: Boolean, + compressionEnabled: Boolean, + compressionBlockSize: Int, sortMergeJoinFunction: SortMergeJoinFunction): CodeGenOperatorFactory[RowData] = { val buildSer = new BinaryRowDataSerializer(buildType.getFieldCount) @@ -185,7 +187,8 @@ object LongHashJoinGenerator { |public class $tableTerm extends ${classOf[LongHybridHashTable].getCanonicalName} { | | public $tableTerm() { - | super(getContainingTask().getJobConfiguration(), getContainingTask(), + | super(getContainingTask(), + | $compressionEnabled, $compressionBlockSize, | $buildSerTerm, $probeSerTerm, | getContainingTask().getEnvironment().getMemoryManager(), | computeMemorySize(), diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala index ac9febcfe6e..55b0e58366d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala @@ -585,7 +585,10 @@ object HashAggCodeGenHelper { outputType: RowType, outputResultFromMap: String, sorterTerm: String, - retryAppend: String): (String, String) = { + retryAppend: String, + maxNumFileHandles: Int, + compressionEnabled: Boolean, + compressionBlockSize: Int): (String, String) = { val (grouping, auxGrouping) = groupingAndAuxGrouping if (isFinal) { val logMapSpilling = @@ -603,7 +606,10 @@ object HashAggCodeGenHelper { groupKeyRowType, groupKeyTypesTerm, aggBufferTypesTerm, - sorterTerm) + sorterTerm, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize) val fallbackToSortAggCode = genFallbackToSortAgg( ctx, builder, @@ -711,7 +717,10 @@ object HashAggCodeGenHelper { groupKeyRowType: RowType, groupKeyTypesTerm: String, aggBufferTypesTerm: String, - sorterTerm: String): String = { + sorterTerm: String, + maxNumFileHandles: Int, + compressionEnabled: Boolean, + compressionBlockSize: Int): String = { val keyComputerTerm = CodeGenUtils.newName("keyComputer") val recordComparatorTerm = CodeGenUtils.newName("recordComparator") val prepareSorterCode = @@ -727,7 +736,7 @@ object HashAggCodeGenHelper { | new $binaryRowSerializerTypeTerm($aggBufferTypesTerm.length), | $keyComputerTerm, $recordComparatorTerm, | getContainingTask().getEnvironment().getMemoryManager().getPageSize(), - | getContainingTask().getJobConfiguration() + | $maxNumFileHandles, $compressionEnabled, $compressionBlockSize | ); """.stripMargin } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala index d13fd831522..e9ccb27aef0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala @@ -105,8 +105,10 @@ object HashAggCodeGenerator { auxGrouping: Array[Int], isMerge: Boolean, isFinal: Boolean, - supportAdaptiveLocalHashAgg: Boolean) - : GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = { + supportAdaptiveLocalHashAgg: Boolean, + maxNumFileHandles: Int, + compressionEnabled: Boolean, + compressionBlockSize: Int): GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = { val aggInfos = aggInfoList.aggInfos val functionIdentifiers = AggCodeGenHelper.getFunctionIdentifiers(aggInfos) @@ -238,7 +240,10 @@ object HashAggCodeGenerator { outputType, outputResultFromMap, sorterTerm, - retryAppend + retryAppend, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize ) HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) sorterTerm else null) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java index b591cca3b43..85516450220 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.codegen; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; @@ -128,6 +129,11 @@ public class LongAdaptiveHashJoinGeneratorTest extends Int2AdaptiveHashJoinOpera reverseJoinFunction, condFunc, buildLeft, + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(), + (int) + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes(), sortMergeJoinFunction); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala index 78e50368396..6b8df49ffda 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.codegen.agg.batch import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.LongAvgAggFunction import org.apache.flink.table.planner.plan.utils.{AggregateInfo, AggregateInfoList} @@ -134,7 +135,11 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase { auxGrouping, isMerge, isFinal, - false) + false, + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(), + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue, + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue.getBytes.toInt + ) (new CodeGenOperatorFactory[RowData](genOp), iType, oType) } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java index 72dcc28267d..cd71bd9142b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.hashtable; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.compression.BlockCompressionFactory; import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; @@ -27,7 +26,6 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.runtime.util.FileChannelUtil; import org.apache.flink.table.runtime.util.LazyMemorySegmentPool; import org.apache.flink.table.runtime.util.MemorySegmentPool; @@ -127,7 +125,7 @@ public abstract class BaseHybridHashTable implements MemorySegmentPool { */ protected FileIOChannel.Enumerator currentEnumerator; - protected final boolean compressionEnable; + protected final boolean compressionEnabled; protected final BlockCompressionFactory compressionCodecFactory; protected final int compressionBlockSize; @@ -135,27 +133,22 @@ public abstract class BaseHybridHashTable implements MemorySegmentPool { protected transient long spillInBytes; public BaseHybridHashTable( - Configuration conf, Object owner, + boolean compressionEnabled, + int compressionBlockSize, MemoryManager memManager, long reservedMemorySize, IOManager ioManager, int avgRecordLen, long buildRowCount, boolean tryDistinctBuildRow) { - - // TODO: read compression config from configuration - this.compressionEnable = - conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED); + this.compressionEnabled = compressionEnabled; this.compressionCodecFactory = - this.compressionEnable + this.compressionEnabled ? BlockCompressionFactory.createBlockCompressionFactory( BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null; - this.compressionBlockSize = - (int) - conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) - .getBytes(); + this.compressionBlockSize = compressionBlockSize; this.avgRecordLen = avgRecordLen; this.buildRowCount = buildRowCount; this.tryDistinctBuildRow = tryDistinctBuildRow; @@ -496,7 +489,7 @@ public abstract class BaseHybridHashTable implements MemorySegmentPool { ioManager, id, retSegments, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, segmentSize); @@ -517,7 +510,7 @@ public abstract class BaseHybridHashTable implements MemorySegmentPool { ioManager, id, new LinkedBlockingQueue<>(), - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, segmentSize); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java index 26cfb0bcd9a..b21a00f803b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.hashtable; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; @@ -143,8 +142,9 @@ public class BinaryHashTable extends BaseHybridHashTable { BinaryRowData reuseBuildRow; public BinaryHashTable( - Configuration conf, Object owner, + boolean compressionEnabled, + int compressionBlockSize, AbstractRowDataSerializer buildSideSerializer, AbstractRowDataSerializer probeSideSerializer, Projection<RowData, BinaryRowData> buildSideProjection, @@ -161,8 +161,9 @@ public class BinaryHashTable extends BaseHybridHashTable { boolean[] filterNulls, boolean tryDistinctBuildRow) { super( - conf, owner, + compressionEnabled, + compressionBlockSize, memManager, reservedMemorySize, ioManager, @@ -447,7 +448,7 @@ public class BinaryHashTable extends BaseHybridHashTable { ioManager, channelWithMeta, new ArrayList<>(), - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, segmentSize); @@ -614,7 +615,7 @@ public class BinaryHashTable extends BaseHybridHashTable { getNotNullNextBuffer(), this, this.segmentSize, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize); area.setPartition(p); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java index 306420272bf..a8d9ad9b02a 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java @@ -461,7 +461,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl ioAccess, targetChannel, bufferReturnQueue, - longTable.compressionEnable(), + longTable.compressionEnabled(), longTable.compressionCodecFactory(), longTable.compressionBlockSize(), segmentSize); @@ -487,7 +487,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl FileChannelUtil.createOutputView( ioAccess, probeChannelEnumerator.next(), - longTable.compressionEnable(), + longTable.compressionEnabled(), longTable.compressionCodecFactory(), longTable.compressionBlockSize(), segmentSize); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java index 75081ea81b7..8ebbb585e47 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.hashtable; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.compression.BlockCompressionFactory; import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; @@ -70,8 +69,9 @@ public abstract class LongHybridHashTable extends BaseHybridHashTable { private LongHashPartition densePartition; public LongHybridHashTable( - Configuration conf, Object owner, + boolean compressionEnabled, + int compressionBlockSize, BinaryRowDataSerializer buildSideSerializer, BinaryRowDataSerializer probeSideSerializer, MemoryManager memManager, @@ -80,8 +80,9 @@ public abstract class LongHybridHashTable extends BaseHybridHashTable { int avgRecordLen, long buildRowCount) { super( - conf, owner, + compressionEnabled, + compressionBlockSize, memManager, reservedMemorySize, ioManager, @@ -412,7 +413,7 @@ public abstract class LongHybridHashTable extends BaseHybridHashTable { ioManager, channelWithMeta, new ArrayList<>(), - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, segmentSize); @@ -658,8 +659,8 @@ public abstract class LongHybridHashTable extends BaseHybridHashTable { this.partitionsPendingForSMJ.clear(); } - public boolean compressionEnable() { - return compressionEnable; + public boolean compressionEnabled() { + return compressionEnabled; } public BlockCompressionFactory compressionCodecFactory() { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java index 65183a07f03..abdaa22c386 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java @@ -124,8 +124,9 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> this.table = new BinaryHashTable( - getContainingTask().getJobConfiguration(), getContainingTask(), + parameter.compressionEnabled, + parameter.compressionBlockSize, buildSerializer, probeSerializer, parameter.buildProjectionCode.newInstance(cl), @@ -322,6 +323,8 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> public static HashJoinOperator newHashJoinOperator( HashJoinType type, boolean leftIsBuild, + boolean compressionEnable, + int compressionBlockSize, GeneratedJoinCondition condFuncCode, boolean reverseJoinFunction, boolean[] filterNullKeys, @@ -337,6 +340,8 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> new HashJoinParameter( type, leftIsBuild, + compressionEnable, + compressionBlockSize, condFuncCode, reverseJoinFunction, filterNullKeys, @@ -372,6 +377,8 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> static class HashJoinParameter implements Serializable { HashJoinType type; boolean leftIsBuild; + boolean compressionEnabled; + int compressionBlockSize; GeneratedJoinCondition condFuncCode; boolean reverseJoinFunction; boolean[] filterNullKeys; @@ -387,6 +394,8 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> HashJoinParameter( HashJoinType type, boolean leftIsBuild, + boolean compressionEnabled, + int compressionBlockSize, GeneratedJoinCondition condFuncCode, boolean reverseJoinFunction, boolean[] filterNullKeys, @@ -400,6 +409,8 @@ public abstract class HashJoinOperator extends TableStreamOperator<RowData> SortMergeJoinFunction sortMergeJoinFunction) { this.type = type; this.leftIsBuild = leftIsBuild; + this.compressionEnabled = compressionEnabled; + this.compressionBlockSize = compressionBlockSize; this.condFuncCode = condFuncCode; this.reverseJoinFunction = reverseJoinFunction; this.filterNullKeys = filterNullKeys; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java index 616f9ebb71c..aa5bed09320 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java @@ -59,6 +59,10 @@ public class SortMergeJoinFunction implements Serializable { private final FlinkJoinType type; private final boolean leftIsSmaller; private final boolean[] filterNulls; + private final int maxNumFileHandles; + private final boolean compressionEnabled; + private final int compressionBlockSize; + private final boolean asyncMergeEnabled; // generated code to cook private GeneratedJoinCondition condFuncCode; @@ -93,6 +97,10 @@ public class SortMergeJoinFunction implements Serializable { double externalBufferMemRatio, FlinkJoinType type, boolean leftIsSmaller, + int maxNumFileHandles, + boolean compressionEnabled, + int compressionBlockSize, + boolean asyncMergeEnabled, GeneratedJoinCondition condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, @@ -105,6 +113,10 @@ public class SortMergeJoinFunction implements Serializable { this.externalBufferMemRatio = externalBufferMemRatio; this.type = type; this.leftIsSmaller = leftIsSmaller; + this.maxNumFileHandles = maxNumFileHandles; + this.compressionEnabled = compressionEnabled; + this.compressionBlockSize = compressionBlockSize; + this.asyncMergeEnabled = asyncMergeEnabled; this.condFuncCode = condFuncCode; this.projectionCode1 = projectionCode1; this.projectionCode2 = projectionCode2; @@ -159,7 +171,6 @@ public class SortMergeJoinFunction implements Serializable { + ", please increase manage memory of task manager."); } - Configuration conf = taskContainer.getJobConfiguration(); // sorter1 this.sorter1 = new BinaryExternalSorter( @@ -171,7 +182,10 @@ public class SortMergeJoinFunction implements Serializable { serializer1, computer1.newInstance(cl), comparator1.newInstance(cl), - conf); + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled); this.sorter1.startThreads(); // sorter2 @@ -185,7 +199,10 @@ public class SortMergeJoinFunction implements Serializable { serializer2, computer2.newInstance(cl), comparator2.newInstance(cl), - conf); + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled); this.sorter2.startThreads(); keyComparator = genKeyComparator.newInstance(cl); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java index 33faa8bf72b..22185c278b5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java @@ -51,7 +51,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> implements Closeable { private final int maxFanIn; private final SpillChannelManager channelManager; - private final boolean compressionEnable; + private final boolean compressionEnabled; private final BlockCompressionFactory compressionCodecFactory; private final int compressionBlockSize; @@ -63,14 +63,14 @@ public abstract class AbstractBinaryExternalMerger<Entry> implements Closeable { int pageSize, int maxFanIn, SpillChannelManager channelManager, - boolean compressionEnable, + boolean compressionEnabled, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize) { this.ioManager = ioManager; this.pageSize = pageSize; this.maxFanIn = maxFanIn; this.channelManager = channelManager; - this.compressionEnable = compressionEnable; + this.compressionEnabled = compressionEnabled; this.compressionCodecFactory = compressionCodecFactory; this.compressionBlockSize = compressionBlockSize; } @@ -102,7 +102,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> implements Closeable { ioManager, channel, openChannels, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, pageSize); @@ -185,7 +185,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> implements Closeable { FileChannelUtil.createOutputView( ioManager, mergedChannelID, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, pageSize); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java index b68f936cab7..74467c5c761 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java @@ -46,7 +46,7 @@ public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRow SpillChannelManager channelManager, BinaryRowDataSerializer serializer, RecordComparator comparator, - boolean compressionEnable, + boolean compressionEnabled, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize) { super( @@ -54,7 +54,7 @@ public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRow pageSize, maxFanIn, channelManager, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize); this.serializer = serializer; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java index 149025ebd43..09da136143a 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java @@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.operators.sort; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.AlgorithmOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.compression.BlockCompressionFactory; import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; @@ -31,7 +30,6 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter; import org.apache.flink.runtime.operators.sort.QuickSort; import org.apache.flink.runtime.operators.sort.Sorter; import org.apache.flink.runtime.util.EmptyMutableObjectIterator; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; @@ -152,11 +150,11 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { private final int memorySegmentSize; - private final boolean compressionEnable; + private final boolean compressionEnabled; private final BlockCompressionFactory compressionCodecFactory; private final int compressionBlockSize; - private final boolean asyncMergeEnable; + private final boolean asyncMergeEnabled; // ------------------------------------------------------------------------ // Constructor & Shutdown @@ -178,7 +176,10 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { BinaryRowDataSerializer serializer, NormalizedKeyComputer normalizedKeyComputer, RecordComparator comparator, - Configuration conf) { + int maxNumFileHandles, + boolean compressionEnabled, + int compressionBlockSize, + boolean asyncMergeEnabled) { this( owner, memoryManager, @@ -188,7 +189,10 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { serializer, normalizedKeyComputer, comparator, - conf, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled, AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue()); } @@ -201,23 +205,19 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { BinaryRowDataSerializer serializer, NormalizedKeyComputer normalizedKeyComputer, RecordComparator comparator, - Configuration conf, + int maxNumFileHandles, + boolean compressionEnabled, + int compressionBlockSize, + boolean asyncMergeEnabled, float startSpillingFraction) { - int maxNumFileHandles = - conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES); - this.compressionEnable = - conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED); + this.compressionEnabled = compressionEnabled; this.compressionCodecFactory = - this.compressionEnable + this.compressionEnabled ? BlockCompressionFactory.createBlockCompressionFactory( BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null; - this.compressionBlockSize = - (int) - conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) - .getBytes(); - asyncMergeEnable = - conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED); + this.compressionBlockSize = compressionBlockSize; + this.asyncMergeEnabled = asyncMergeEnabled; checkArgument(maxNumFileHandles >= 2); checkNotNull(ioManager); @@ -256,8 +256,8 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { + "maxNumFileHandles({}), compressionEnable({}), compressionCodecFactory({}), compressionBlockSize({}).", sortMemPages, maxNumFileHandles, - compressionEnable, - compressionEnable ? compressionCodecFactory.getClass() : null, + compressionEnabled, + compressionEnabled ? compressionCodecFactory.getClass() : null, compressionBlockSize); this.sortBuffers = new ArrayList<>(); @@ -314,7 +314,7 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { channelManager, (BinaryRowDataSerializer) serializer.duplicate(), comparator, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize); @@ -1010,7 +1010,7 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { FileChannelUtil.createOutputView( ioManager, channel, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, memorySegmentSize); @@ -1126,7 +1126,7 @@ public class BinaryExternalSorter implements Sorter<BinaryRowData> { spillChannelIDs.add(channelID); // if async merge is disabled, we will only do the final merge // otherwise we wait for `maxFanIn` number of channels to begin a merge - if (!asyncMergeEnable || spillChannelIDs.size() < maxFanIn) { + if (!asyncMergeEnabled || spillChannelIDs.size() < maxFanIn) { continue; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java index bdd009051af..ca6f65e45fb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java @@ -40,7 +40,7 @@ public final class BinaryInMemorySortBuffer extends BinaryIndexedSortable { private static final int MIN_REQUIRED_BUFFERS = 3; - private AbstractRowDataSerializer<RowData> inputSerializer; + private final AbstractRowDataSerializer<RowData> inputSerializer; private final ArrayList<MemorySegment> recordBufferSegments; private final SimpleCollectingOutputView recordCollector; private final int totalNumBuffers; @@ -185,7 +185,7 @@ public final class BinaryInMemorySortBuffer extends BinaryIndexedSortable { * * @return An iterator returning the records in their logical order. */ - public final MutableObjectIterator<BinaryRowData> getIterator() { + public MutableObjectIterator<BinaryRowData> getIterator() { return new MutableObjectIterator<BinaryRowData>() { private final int size = size(); private int current = 0; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java index ed87f8a5048..db98dfd77fe 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java @@ -48,7 +48,7 @@ public class BinaryKVExternalMerger BinaryRowDataSerializer keySerializer, BinaryRowDataSerializer valueSerializer, RecordComparator comparator, - boolean compressionEnable, + boolean compressionEnabled, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize) { super( @@ -56,7 +56,7 @@ public class BinaryKVExternalMerger pageSize, maxFanIn, channelManager, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize); this.keySerializer = keySerializer; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java index 4fdf6976d40..38f721e355f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java @@ -65,8 +65,7 @@ public class BinaryKVInMemorySortBuffer extends BinaryIndexedSortable { BinaryRowDataSerializer valueSerializer, RecordComparator comparator, ArrayList<MemorySegment> recordBufferSegments, - MemorySegmentPool memorySegmentPool) - throws IOException { + MemorySegmentPool memorySegmentPool) { super( normalizedKeyComputer, keySerializer, diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java index 02b6aabb8dc..11c252408fe 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.operators.sort; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.compression.BlockCompressionFactory; import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; @@ -26,7 +25,6 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.operators.sort.IndexedSorter; import org.apache.flink.runtime.operators.sort.QuickSort; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; import org.apache.flink.table.runtime.generated.RecordComparator; @@ -73,14 +71,14 @@ public class BufferedKVExternalSorter { private final List<ChannelWithMeta> channelIDs = new ArrayList<>(); private final SpillChannelManager channelManager; - private int pageSize; + private final int pageSize; // metric private long numSpillFiles; private long spillInBytes; private long spillInCompressedBytes; - private final boolean compressionEnable; + private final boolean compressionEnabled; private final BlockCompressionFactory compressionCodecFactory; private final int compressionBlockSize; @@ -91,27 +89,23 @@ public class BufferedKVExternalSorter { NormalizedKeyComputer nKeyComputer, RecordComparator comparator, int pageSize, - Configuration conf) - throws IOException { + int maxNumFileHandles, + boolean compressionEnabled, + int compressionBlockSize) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; this.nKeyComputer = nKeyComputer; this.comparator = comparator; this.pageSize = pageSize; this.sorter = new QuickSort(); - this.maxNumFileHandles = - conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES); - this.compressionEnable = - conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED); + this.maxNumFileHandles = maxNumFileHandles; + this.compressionEnabled = compressionEnabled; this.compressionCodecFactory = - this.compressionEnable + this.compressionEnabled ? BlockCompressionFactory.createBlockCompressionFactory( BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null; - this.compressionBlockSize = - (int) - conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) - .getBytes(); + this.compressionBlockSize = compressionBlockSize; this.ioManager = ioManager; this.enumerator = this.ioManager.createChannelEnumerator(); this.channelManager = new SpillChannelManager(); @@ -124,7 +118,7 @@ public class BufferedKVExternalSorter { keySerializer, valueSerializer, comparator, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize); } @@ -176,7 +170,7 @@ public class BufferedKVExternalSorter { FileChannelUtil.createOutputView( ioManager, channel, - compressionEnable, + compressionEnabled, compressionCodecFactory, compressionBlockSize, pageSize); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java index c09db2cb073..927c5d97e15 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java @@ -47,14 +47,28 @@ public class SortOperator extends TableStreamOperator<RowData> private GeneratedNormalizedKeyComputer gComputer; private GeneratedRecordComparator gComparator; + private final int maxNumFileHandles; + private final boolean compressionEnabled; + private final int compressionBlockSize; + private final boolean asyncMergeEnabled; + private transient BinaryExternalSorter sorter; private transient StreamRecordCollector<RowData> collector; private transient BinaryRowDataSerializer binarySerializer; public SortOperator( - GeneratedNormalizedKeyComputer gComputer, GeneratedRecordComparator gComparator) { + GeneratedNormalizedKeyComputer gComputer, + GeneratedRecordComparator gComparator, + int maxNumFileHandles, + boolean compressionEnabled, + int compressionBlockSize, + boolean asyncMergeEnabled) { this.gComputer = gComputer; this.gComparator = gComparator; + this.maxNumFileHandles = maxNumFileHandles; + this.compressionEnabled = compressionEnabled; + this.compressionBlockSize = compressionBlockSize; + this.asyncMergeEnabled = asyncMergeEnabled; } @Override @@ -85,7 +99,10 @@ public class SortOperator extends TableStreamOperator<RowData> binarySerializer, computer, comparator, - getContainingTask().getJobConfiguration()); + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled); this.sorter.startThreads(); collector = new StreamRecordCollector<>(output); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java index 9d8ee6d07b9..a8e0da893be 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java @@ -49,12 +49,12 @@ public class FileChannelUtil { IOManager ioManager, ChannelWithMeta channel, List<FileIOChannel> channels, - boolean compressionEnable, + boolean compressionEnabled, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize, int segmentSize) throws IOException { - if (compressionEnable) { + if (compressionEnabled) { CompressedHeaderlessChannelReaderInputView in = new CompressedHeaderlessChannelReaderInputView( channel.getChannel(), @@ -82,12 +82,12 @@ public class FileChannelUtil { public static AbstractChannelWriterOutputView createOutputView( IOManager ioManager, FileIOChannel.ID channel, - boolean compressionEnable, + boolean compressionEnabled, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize, int segmentSize) throws IOException { - if (compressionEnable) { + if (compressionEnabled) { BufferFileWriter bufferWriter = ioManager.createBufferFileWriter(channel); return new CompressedHeaderlessChannelWriterOutputView( bufferWriter, compressionCodecFactory, compressionBlockSize); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java index b9868059497..0d4d6b2b748 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java @@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.hashtable; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -28,7 +27,6 @@ import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.MemoryManagerBuilder; import org.apache.flink.runtime.operators.testutils.UnionIterator; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.data.writer.BinaryRowWriter; @@ -53,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -66,7 +65,6 @@ public class BinaryHashTableTest { private BinaryRowDataSerializer probeSideSerializer; private boolean useCompress; - private Configuration conf; public BinaryHashTableTest(boolean useCompress) { this.useCompress = useCompress; @@ -84,9 +82,6 @@ public class BinaryHashTableTest { this.probeSideSerializer = new BinaryRowDataSerializer(types.length); this.ioManager = new IOManagerAsync(); - - conf = new Configuration(); - conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, useCompress); } @After @@ -736,8 +731,9 @@ public class BinaryHashTableTest { MemoryManagerBuilder.newBuilder().setMemorySize(96 * PAGE_SIZE).build(); final BinaryHashTable table = new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), @@ -836,8 +832,9 @@ public class BinaryHashTableTest { final BinaryHashTable table = new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), @@ -901,8 +898,9 @@ public class BinaryHashTableTest { // allocate the memory for the HashTable final BinaryHashTable table = new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), @@ -1006,8 +1004,9 @@ public class BinaryHashTableTest { final BinaryHashTable table = new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), buildSideSerializer, probeSideSerializer, new MyProjection(), @@ -1071,8 +1070,9 @@ public class BinaryHashTableTest { final BinaryHashTable table = new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), buildSideSerializer, probeSideSerializer, new MyProjection(), @@ -1169,8 +1169,9 @@ public class BinaryHashTableTest { long memory, IOManager ioManager) { return new BinaryHashTable( - conf, new Object(), + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), buildSideSerializer, probeSideSerializer, buildSideProjection, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java index 8ec312ee392..660dcaf06ae 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java @@ -20,14 +20,12 @@ package org.apache.flink.table.runtime.hashtable; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.MemoryManagerBuilder; import org.apache.flink.runtime.operators.testutils.UnionIterator; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; @@ -48,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; @@ -63,7 +62,6 @@ public class LongHashTableTest { MemoryManagerBuilder.newBuilder().setMemorySize(896 * PAGE_SIZE).build(); private boolean useCompress; - private Configuration conf; public LongHashTableTest(boolean useCompress) { this.useCompress = useCompress; @@ -80,17 +78,15 @@ public class LongHashTableTest { this.buildSideSerializer = new BinaryRowDataSerializer(types.length); this.probeSideSerializer = new BinaryRowDataSerializer(types.length); this.ioManager = new IOManagerAsync(); - - conf = new Configuration(); - conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, useCompress); } private class MyHashTable extends LongHybridHashTable { public MyHashTable(long memorySize) { super( - conf, LongHashTableTest.this, + useCompress, + (int) TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(), buildSideSerializer, probeSideSerializer, memManager, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java index a280e28eebb..fab38dfd62a 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.operators.aggregate; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memory.MemoryManager; @@ -80,11 +79,6 @@ public class HashAggTest { }; } - @Override - Configuration getConf() { - return new Configuration(); - } - @Override public IOManager getIOManager() { return ioManager; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java index 88be5ef2f0b..d10b1fdb54d 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java @@ -19,12 +19,12 @@ package org.apache.flink.table.runtime.operators.aggregate; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -119,7 +119,15 @@ public class SumHashAggTestOperator extends AbstractStreamOperator<RowData> new IntNormalizedKeyComputer(), new IntRecordComparator(), getMemoryManager().getPageSize(), - getConf()); + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES + .defaultValue(), + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED + .defaultValue(), + (int) + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes()); } // sort and spill sorter.sortAndSpill( @@ -255,10 +263,6 @@ public class SumHashAggTestOperator extends AbstractStreamOperator<RowData> return getContainingTask().getEnvironment().getMemoryManager(); } - Configuration getConf() { - return getContainingTask().getJobConfiguration(); - } - public IOManager getIOManager() { return getContainingTask().getEnvironment().getIOManager(); } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java index 0988d272dfc..53a8dab724f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.data.utils.JoinedRowData; @@ -148,6 +149,18 @@ public abstract class Int2HashJoinOperatorTestBase implements Serializable { }; boolean[] filterNulls = new boolean[] {true}; + int maxNumFileHandles = + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(); + boolean compressionEnabled = + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(); + int compressionBlockSize = + (int) + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes(); + boolean asyncMergeEnabled = + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(); + SortMergeJoinFunction sortMergeJoinFunction; if (buildLeft) { sortMergeJoinFunction = @@ -155,6 +168,10 @@ public abstract class Int2HashJoinOperatorTestBase implements Serializable { 0, flinkJoinType, buildLeft, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled, condFuncCode, buildProjectionCode, probeProjectionCode, @@ -170,6 +187,10 @@ public abstract class Int2HashJoinOperatorTestBase implements Serializable { 0, flinkJoinType, buildLeft, + maxNumFileHandles, + compressionEnabled, + compressionBlockSize, + asyncMergeEnabled, condFuncCode, probeProjectionCode, buildProjectionCode, @@ -184,6 +205,8 @@ public abstract class Int2HashJoinOperatorTestBase implements Serializable { return HashJoinOperator.newHashJoinOperator( hashJoinType, buildLeft, + compressionEnabled, + compressionBlockSize, condFuncCode, reverseJoinFunction, filterNulls, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java index 9deb124cb5c..73368b5470c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.MemoryManagerBuilder; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; @@ -215,10 +216,25 @@ public class Int2SortMergeJoinOperatorTest { } public static SortMergeJoinFunction getJoinFunction(FlinkJoinType type, boolean leftIsSmaller) { + int maxNumFileHandles = + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(); + boolean compressionEnable = + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(); + int compressionBlockSize = + (int) + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes(); + boolean asyncMergeEnable = + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(); return new SortMergeJoinFunction( 0, type, leftIsSmaller, + maxNumFileHandles, + compressionEnable, + compressionBlockSize, + asyncMergeEnable, new GeneratedJoinCondition("", "", new Object[0]) { @Override public JoinCondition newInstance(ClassLoader classLoader) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java index ed6ff982eba..bb75ffd16e5 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -398,11 +399,27 @@ public class String2HashJoinOperatorTest implements Serializable { }; boolean[] filterNulls = new boolean[] {true}; + int maxNumFileHandles = + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(); + boolean compressionEnable = + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(); + int compressionBlockSize = + (int) + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes(); + boolean asyncMergeEnable = + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(); + SortMergeJoinFunction sortMergeJoinFunction = new SortMergeJoinFunction( 0, flinkJoinType, buildLeft, + maxNumFileHandles, + compressionEnable, + compressionBlockSize, + asyncMergeEnable, condFuncCode, probeProjectionCode, buildProjectionCode, @@ -416,6 +433,8 @@ public class String2HashJoinOperatorTest implements Serializable { return HashJoinOperator.newHashJoinOperator( hashJoinType, buildLeft, + compressionEnable, + compressionBlockSize, condFuncCode, reverseJoinFunction, filterNulls, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java index cb1ea8357ca..faab30bccd4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.data.utils.JoinedRowData; @@ -185,11 +186,26 @@ public class String2SortMergeJoinOperatorTest { } static StreamOperator newOperator(FlinkJoinType type, boolean leftIsSmaller) { + int maxNumFileHandles = + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(); + boolean compressionEnable = + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(); + int compressionBlockSize = + (int) + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE + .defaultValue() + .getBytes(); + boolean asyncMergeEnable = + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(); SortMergeJoinFunction sortMergeJoinFunction = new SortMergeJoinFunction( 0, type, leftIsSmaller, + maxNumFileHandles, + compressionEnable, + compressionBlockSize, + asyncMergeEnable, new GeneratedJoinCondition("", "", new Object[0]) { @Override public JoinCondition newInstance(ClassLoader classLoader) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java index 052bf244b05..f88caa5696f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java @@ -134,7 +134,14 @@ public class BinaryExternalSorterTest { serializer, IntNormalizedKeyComputer.INSTANCE, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 1f); sorter.startThreads(); sorter.write(reader); @@ -174,7 +181,14 @@ public class BinaryExternalSorterTest { serializer, IntNormalizedKeyComputer.INSTANCE, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); sorter.write(reader); @@ -215,7 +229,14 @@ public class BinaryExternalSorterTest { } }, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); sorter.write(new MockBinaryRowReader(size)); @@ -257,7 +278,14 @@ public class BinaryExternalSorterTest { serializer, IntNormalizedKeyComputer.INSTANCE, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); sorter.write(reader); @@ -305,7 +333,14 @@ public class BinaryExternalSorterTest { return -super.compare(o1, o2); } }, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); sorter.write(reader); @@ -351,7 +386,14 @@ public class BinaryExternalSorterTest { serializer, IntNormalizedKeyComputer.INSTANCE, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); sorter.write(reader); @@ -389,7 +431,14 @@ public class BinaryExternalSorterTest { serializer, IntNormalizedKeyComputer.INSTANCE, IntRecordComparator.INSTANCE, - conf, + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes(), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED), 0.7f); sorter.startThreads(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java index 95f5bf922ab..484310610ad 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java @@ -115,7 +115,13 @@ public class BufferedKVExternalSorterTest { computer, comparator, PAGE_SIZE, - conf); + conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES), + conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED), + (int) + conf.get( + ExecutionConfigOptions + .TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE) + .getBytes()); TestMemorySegmentPool pool = new TestMemorySegmentPool(PAGE_SIZE); List<Integer> expected = new ArrayList<>(); for (int i = 0; i < spillNumber; i++) {