[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709623#comment-17709623 ] dalongliu commented on FLINK-30989: --- [~Weijie Guo] Yes, we need to pick it back to release-1.16. > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.2, 1.18.0, 1.17.1 > > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED >
[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708930#comment-17708930 ] Weijie Guo commented on FLINK-30989: Summary: Fixed streaming module: master(1.18) via ccd0fe2d75a26a158ad64ab25bb5063a7031d428. release-1.17 via 40e9501a5fcd7a71af4a7e79cd1556e190488137 release-1.16 via 75fab2759a1a2a2664d6b9f4a006a56f1a65d2fe. Fixed Table module: master(1.18) via b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4. release-1.17 via 333088113993f4607038dae391863b5c30d0bc95. release-1.16 waiting for the confirmation. > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGr
[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708341#comment-17708341 ] Weijie Guo commented on FLINK-30989: [~lsy] Can you confirm whether the changes in the Table part need backport to release-1.16 branch. > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES > ** ExecutionConfigOptions.TABLE_EXEC_SPI
[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690209#comment-17690209 ] Weijie Guo commented on FLINK-30989: After offline discuss, the configuration does not take effect in both the runtime module and the table module. [~lsy] want to fix the table module, the others leave to me. > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES >
[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690161#comment-17690161 ] dalongliu commented on FLINK-30989: --- Thanks for repport it. For the table module, I will take a look. > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SPILL
[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690135#comment-17690135 ] Weijie Guo commented on FLINK-30989: cc [~lsy] > Configuration table.exec.spill-compression.block-size not take effect in > batch job > -- > > Key: FLINK-30989 > URL: https://issues.apache.org/jira/browse/FLINK-30989 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: shen >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-09-19-37-44-927.png > > > h1. Description > I tried to config table.exec.spill-compression.block-size in TableEnv in my > job and failed. I attached to TaskManager and found conf passed to > constructor of > [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] > is empty: > !image-2023-02-09-19-37-44-927.png|width=306,height=185! > h1. How to reproduce > A simple code to reproduce this problem: > {code:java} > // App.java > package test.flink403; > import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; > import static > org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.configuration.AlgorithmOptions; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.config.ExecutionConfigOptions; > import java.util.Arrays; public class App { > public static void main(String argc[]) throws Exception { > Configuration config = new Configuration(); > config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); > config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, > true); > config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); > config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); > // < cannot take effect > config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, config); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 > m"); // < cannot take effect > final DataStream orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final Table tableA = tableEnv.fromDataStream(orderA); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " " > + " order by user"); > tableEnv.toDataStream(result, Order.class).print(); > env.execute(); > } > } > // --- > // Order.java > package test.flink403; > public class Order { > public Long user; > public String product; > public int amount; > // for POJO detection in DataStream API > public Order() {} > // for structured type detection in Table API > public Order(Long user, String product, int amount) { > this.user = user; > this.product = product; > this.amount = amount; > } > @Override > public String toString() { > return "Order{" > + "user=" > + user > + ", product='" > + product > + '\'' > + ", amount=" > + amount > + '}'; > } > }{code} > > I think it is because > [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] > try to get conf from JobConfiguration, which should be set in JobGraph. > Following are the Classes use the same method to get conf from > JobConfiguration: > * BinaryExternalSorter > ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED > ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE > * BinaryHashTable,BaseHyb