[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-07 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709623#comment-17709623
 ] 

dalongliu commented on FLINK-30989:
---

[~Weijie Guo] Yes, we need to pick it back to release-1.16.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-05 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708930#comment-17708930
 ] 

Weijie Guo commented on FLINK-30989:


Summary:

Fixed streaming module:
master(1.18) via ccd0fe2d75a26a158ad64ab25bb5063a7031d428.
release-1.17 via 40e9501a5fcd7a71af4a7e79cd1556e190488137
release-1.16 via 75fab2759a1a2a2664d6b9f4a006a56f1a65d2fe.

Fixed Table module:
master(1.18) via b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4.
release-1.17 via 333088113993f4607038dae391863b5c30d0bc95.
release-1.16 waiting for the confirmation.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGr

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-04 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708341#comment-17708341
 ] 

Weijie Guo commented on FLINK-30989:


[~lsy] Can you confirm whether the changes in the Table part need backport to 
release-1.16 branch.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPI

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-02-16 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690209#comment-17690209
 ] 

Weijie Guo commented on FLINK-30989:


After offline discuss, the configuration does not take effect in both the 
runtime module and the table module. [~lsy] want to fix the table module, the 
others leave to me.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-02-16 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690161#comment-17690161
 ] 

dalongliu commented on FLINK-30989:
---

Thanks for repport it. For the table module, I will take a look.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-02-16 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690135#comment-17690135
 ] 

Weijie Guo commented on FLINK-30989:


cc [~lsy] 

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * BinaryHashTable,BaseHyb