[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Godfrey He updated FLINK-30989: ------------------------------- Fix Version/s: 1.17.0 1.18.0 > Configuration table.exec.spill-compression.block-size not take effect in > batch job > ---------------------------------------------------------------------------------- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime > Affects Versions: 1.16.1 > Reporter: shen > Assignee: dalongliu > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.18.0 > > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // <---- cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // <---- cannot take effect > final DataStream<Order> orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --------------------------------------------------------------- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE > * BinaryHashTable,BaseHybridHashTable > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE > * SortDataInput > ** AlgorithmOptions.SORT_SPILLING_THRESHOLD > ** AlgorithmOptions.SPILLING_MAX_FAN > ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER -- This message was sent by Atlassian Jira (v8.20.10#820010)