[ 
https://issues.apache.org/jira/browse/FLINK-27922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27922:
-----------------------------------
      Labels: auto-deprioritized-major pull-request-available  (was: 
pull-request-available stale-major)
    Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Flink SQL unique key lost when set table.exec.mini-batch.enabled=true
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27922
>                 URL: https://issues.apache.org/jira/browse/FLINK-27922
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2, 1.15.0
>         Environment: Flink1.12.2
>            Reporter: zhangbin
>            Priority: Minor
>              Labels: auto-deprioritized-major, pull-request-available
>
> Flink SQL table has primary keys, but when set table.exec.mini-batch.enabled 
> =true, the unique key is lost.
> {code:java}
> @Test
> public void testJoinUniqueKey() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);    
> Configuration configuration = tableEnv.getConfig().getConfiguration();
>     configuration.setString("table.exec.mini-batch.enabled", "true");
>     configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
>     configuration.setString("table.exec.mini-batch.size", "5000");    
> StatementSet statementSet = tableEnv.createStatementSet();
>     tableEnv.executeSql("CREATE TABLE `t_apply_sku_test`(`dt` 
> BIGINT,`refund_apply_id` BIGINT,`base_sku_id` BIGINT,`order_id` 
> BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`refund_type` 
> BIGINT,`apply_refund_reason_code` BIGINT,`apply_refund_reason_desc` 
> VARCHAR,`apply_refund_review_status` BIGINT,`apply_refund_review_status_desc` 
> VARCHAR,`apply_refund_reject_reason` VARCHAR,`apply_is_refunded` 
> INTEGER,`apply_pic_url` VARCHAR,`remark` VARCHAR,`refund_apply_originator` 
> BIGINT,`second_reason_code` BIGINT,`second_reason` 
> VARCHAR,`refund_target_account` BIGINT,`after_service_id` 
> BIGINT,`receipt_status` BIGINT,`group_header_goods_status` 
> INTEGER,`apply_operator_mis_name` VARCHAR,`refund_apply_time` 
> BIGINT,`update_time` BIGINT,`base_sku_name` VARCHAR,`apply_refund_num` 
> BIGINT,`view_qty` DECIMAL(38,18),`refund_scale_type` 
> BIGINT,`refund_scale_type_desc` VARCHAR,`refund_scale` 
> DECIMAL(38,18),`apply_refund_amt` DECIMAL(38,18),`refund_scale_user_real_pay` 
> DECIMAL(38,18),`refund_red_packet_price` DECIMAL(38,18),`load_time` 
> VARCHAR,`take_rate_type` BIGINT,`platform_rate` 
> DECIMAL(38,18),`order_sku_type` INTEGER,`second_reason_aggregated_code` 
> INTEGER,`second_reason_aggregated` VARCHAR,`compensation_amount` 
> DECIMAL(38,18),`aftersale_type` INTEGER,`group_header_parallel_status` 
> INTEGER,`grid_parallel_status` INTEGER) WITH ('connector'='blackhole')");
>     tableEnv.executeSql("CREATE TABLE `t_name`(`id` BIGINT,`after_service_id` 
> BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`user_id` BIGINT,`poi_id` 
> BIGINT,`city_id` BIGINT,`refund_type` INTEGER,`first_reason_code` 
> INTEGER,`first_reason` VARCHAR,`second_reason_code` INTEGER,`second_reason` 
> VARCHAR,`pic_url` VARCHAR,`remark` VARCHAR,`refund_price` 
> INTEGER,`refund_red_packet_price` INTEGER,`refund_total_price` 
> INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` 
> INTEGER,`refund_other_price` INTEGER,`user_receipt_status` 
> INTEGER,`collect_status` INTEGER,`refund_target_account` INTEGER,`status` 
> INTEGER,`flow_instance_id` BIGINT,`create_time` BIGINT,`modify_time` BIGINT) 
> WITH ('connector'='datagen')");
>     tableEnv.executeSql("CREATE TABLE `t_item_name`(`id` 
> BIGINT,`refund_fwd_item_id` BIGINT,`after_service_id` BIGINT,`order_id` 
> BIGINT,`order_item_id` BIGINT,`stack_id` BIGINT,`sku_id` BIGINT,`sku_name` 
> VARCHAR,`supplier_id` BIGINT,`refund_quantity` INTEGER,`item_sku_type` 
> INTEGER,`refund_scale_type` INTEGER,`refund_scale` INTEGER,`accurate_refund` 
> INTEGER,`refund_price` INTEGER,`refund_red_packet_price` 
> INTEGER,`refund_price_info` VARCHAR,`refund_total_price` 
> INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` 
> INTEGER,`refund_other_price` INTEGER,`extend_info` VARCHAR,`create_time` 
> BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
>     tableEnv.executeSql("CREATE TABLE `t_progress_name`(`id` 
> BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
> BIGINT,`progress_node` VARCHAR,`progress_node_status` INTEGER,`operator` 
> VARCHAR,`parallel` INTEGER,`flow_element_id` VARCHAR,`extend_info` 
> VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH 
> ('connector'='datagen')");
>     tableEnv.executeSql("CREATE TABLE `t_attr_name`(`id` 
> BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
> BIGINT,`name` VARCHAR,`value` VARCHAR,`create_time` BIGINT,`modify_time` 
> BIGINT) WITH ('connector'='datagen')");
>     tableEnv.executeSql("CREATE TABLE `t_apply_status_name`(`id` 
> BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
> BIGINT,`apply_status` INTEGER,`ai_audit_status` 
> INTEGER,`group_header_confirm_status` INTEGER,`group_header_retrieve_status` 
> INTEGER,`driver_retrieve_status` INTEGER,`group_header_parallel_status` 
> INTEGER,`grid_parallel_status` INTEGER,`create_time` BIGINT,`modify_time` 
> BIGINT) WITH ('connector'='datagen')");
>     tableEnv.executeSql("CREATE VIEW `v_refund_fwd_item_sku` AS SELECT 
> `after_service_id` AS `refund_apply_id`, `order_id`, `sku_id` AS 
> `base_sku_id`, SUM(`if`(`refund_quantity` IS NULL, 0, `refund_quantity`)) AS 
> `apply_refund_num`, SUM(`if`(`refund_quantity` IS NULL, 0, 
> CAST(`refund_quantity` AS DOUBLE))) AS `view_qty`, SUM(`if`(`refund_price` IS 
> NULL, 0, `refund_price`)) AS `apply_refund_amt`, SUM(`if`(`refund_price` IS 
> NULL, 0, `refund_price`)) AS `refund_scale_user_real_pay`, 
> SUM(`if`(`refund_red_packet_price` IS NULL, 0, `refund_red_packet_price`)) AS 
> `refund_red_packet_price`\n"
>             + "FROM `t_item_name`\n"
>             + "GROUP BY `after_service_id`, `order_id`, `sku_id`");    
> statementSet.addInsertSql("INSERT INTO `t_apply_sku_test` SELECT 
> CAST(`FROM_UNIXTIME`(`ord`.`modify_time` / 1000, 'yyyyMMdd') AS BIGINT) AS 
> `dt`, `sku`.`refund_apply_id`,`sku`.`base_sku_id`, `sku`.`order_id`, 
> `ord`.`user_id`, `ord`.`poi_id`, CAST(`ord`.`refund_type` AS BIGINT) AS 
> `refund_type`, CAST(`ord`.`first_reason_code` AS BIGINT) AS 
> `apply_refund_reason_code`, `ord`.`first_reason` AS 
> `apply_refund_reason_desc`, CAST(`stat`.`apply_status` AS BIGINT) AS 
> `apply_refund_review_status`, CASE WHEN `stat`.`apply_status` = 0 OR 
> `stat`.`apply_status` = 13 THEN 'a' WHEN `stat`.`apply_status` = 1 THEN 'b' 
> WHEN `stat`.`apply_status` = 2 THEN 'c' WHEN `stat`.`apply_status` = 3 THEN 
> 'd' WHEN `stat`.`apply_status` = 4 THEN 'e' WHEN `stat`.`apply_status` = 5 
> THEN 'f' WHEN `stat`.`apply_status` = 6 THEN 'g' ELSE 'x' END AS 
> `apply_refund_review_status_desc`, `if`(`progress`.`progress_node_status` = 
> 30, `extend_info`, '') AS `apply_refund_reject_reason`, CASE WHEN 
> `progress`.`progress_node` = 'refund.node' THEN 1 ELSE 0 END AS 
> `apply_is_refunded`, `ord`.`pic_url` AS `apply_pic_url`, `ord`.`remark`, 
> CAST(NULL AS BIGINT) AS `refund_apply_originator`, 
> CAST(`ord`.`second_reason_code` AS BIGINT) AS `second_reason_code`, 
> `ord`.`second_reason`, CAST(`ord`.`refund_target_account` AS BIGINT) AS 
> `refund_target_account`, `ord`.`after_service_id`, 
> CAST(`ord`.`user_receipt_status` AS BIGINT) AS `receipt_status`, CASE WHEN 
> `attr`.`name` = 'fwd_ext' AND `attr`.`value` = '1' THEN 1 WHEN `attr`.`name` 
> = 'fwd_ext' AND `attr`.`value` = '2' THEN 2 ELSE 0 END AS 
> `group_header_goods_status`, `progress`.`operator` AS 
> `apply_operator_mis_name`, `ord`.`create_time` AS `refund_apply_time`, 
> `ord`.`modify_time` AS `update_time`, CAST(NULL AS VARCHAR) AS 
> `base_sku_name`, CAST(`sku`.`apply_refund_num` AS BIGINT) AS 
> `apply_refund_num`, `sku`.`view_qty` AS `view_qty`, CAST(NULL AS BIGINT) AS 
> `refund_scale_type`, CAST(NULL AS VARCHAR) AS `refund_scale_type_desc`, 
> CAST(NULL AS DECIMAL) AS `refund_scale`, `sku`.`apply_refund_amt` AS 
> `apply_refund_amt`, `sku`.`refund_scale_user_real_pay` AS 
> `refund_scale_user_real_pay`, `sku`.`refund_red_packet_price` AS 
> `refund_red_packet_price`, CAST(LOCALTIMESTAMP AS VARCHAR) AS `load_time`, 
> CAST(NULL AS BIGINT) AS `take_rate_type`, CAST(NULL AS DECIMAL) AS 
> `platform_rate`, 1 AS `order_sku_type`, CAST(NULL AS INTEGER) AS 
> `second_reason_aggregated_code`, CAST(NULL AS VARCHAR) AS 
> `second_reason_aggregated`, CAST(NULL AS DECIMAL) AS `compensation_amount`, 1 
> AS `aftersale_type`, CASE WHEN `progress`.`progress_node` = 
> 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 
> THEN 1 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND 
> `parallel` = 1 AND `progress_node_status` = 20 THEN 2 WHEN 
> `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND 
> `progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` = 
> 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 
> THEN 4 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND 
> `parallel` = 1 AND `progress_node_status` = 50 THEN 5 ELSE 0 END AS 
> `group_header_parallel_status`, CASE WHEN `progress`.`progress_node` = 
> 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 THEN 1 
> WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND 
> `progress_node_status` = 20 THEN 2 WHEN `progress`.`progress_node` = 
> 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 30 THEN 3 
> WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND 
> `progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` = 
> 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50 THEN 5 
> ELSE 0 END AS `grid_parallel_status`\n"
>             + "FROM `t_name` AS `ord`\n"
>             + "LEFT JOIN `t_progress_name` AS `progress` ON 
> `ord`.`after_service_id` = `progress`.`after_service_id`\n"
>             + "LEFT JOIN `t_attr_name` AS `attr` ON `ord`.`after_service_id` 
> = `attr`.`after_service_id`\n"
>             + "LEFT JOIN `t_apply_status_name` AS `stat` ON 
> `ord`.`after_service_id` = `stat`.`after_service_id`\n"
>             + "LEFT JOIN `v_refund_fwd_item_sku` AS `sku` ON 
> `stat`.`after_service_id` = `sku`.`refund_apply_id`");    
> System.out.println(statementSet.explain());
> }{code}
> == Optimized Logical Plan ==
> {code:java}
> Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, 
> refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, 
> apply_refund_reason_code, apply_refund_reason_desc, 
> apply_refund_review_status, apply_refund_review_status_desc, 
> apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, 
> refund_apply_originator, second_reason_code, second_reason, 
> refund_target_account, after_service_id, receipt_status, 
> group_header_goods_status, apply_operator_mis_name, refund_apply_time, 
> update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, 
> refund_scale_type_desc, refund_scale, apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price, load_time, 
> take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, 
> second_reason_aggregated, compensation_amount, aftersale_type, 
> group_header_parallel_status, grid_parallel_status])
> +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME 
> _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, 
> poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS 
> apply_refund_reason_code, first_reason AS apply_refund_reason_desc, 
> CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH 
> Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE 
> (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE 
> _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 
> 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE 
> _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = 
> 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, 
> CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS 
> apply_pic_url, remark, null:BIGINT AS refund_apply_originator, 
> CAST(second_reason_code) AS second_reason_code, second_reason, 
> CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) 
> AS after_service_id, CAST(user_receipt_status) AS receipt_status, 
> CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, 
> operator AS apply_operator_mis_name, create_time AS refund_apply_time, 
> modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) 
> AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS 
> refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, 
> CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, 
> CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS 
> load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS 
> platform_rate, 1 AS order_sku_type, null:INTEGER AS 
> second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS 
> compensation_amount, 1 AS aftersale_type, CAST((((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE 
> ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) 
> CASE 2 CASE ((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE 
> ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) 
> CASE 4 CASE ((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 
> 0)) AS group_header_parallel_status, CAST((((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS 
> grid_parallel_status])
>    +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = 
> refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, 
> first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
> remark, user_receipt_status, refund_target_account, create_time, modify_time, 
> progress_node, progress_node_status, operator, parallel, extend_info, name, 
> value, after_service_id0, apply_status, refund_apply_id, order_id, 
> base_sku_id, apply_refund_num, view_qty, apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
>       :- Exchange(distribution=[hash[after_service_id0]])
>       :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
> after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
> first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
> remark, user_receipt_status, refund_target_account, create_time, modify_time, 
> progress_node, progress_node_status, operator, parallel, extend_info, name, 
> value, after_service_id0, apply_status], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey])
>       :     :- Exchange(distribution=[hash[after_service_id]])
>       :     :  +- Calc(select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info, name, value])
>       :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id 
> = after_service_id0)], select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info, after_service_id0, name, value], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>       :     :        :- Exchange(distribution=[hash[after_service_id]])
>       :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info])
>       :     :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[(after_service_id = after_service_id0)], select=[after_service_id, 
> user_id, poi_id, refund_type, first_reason_code, first_reason, 
> second_reason_code, second_reason, pic_url, remark, user_receipt_status, 
> refund_target_account, create_time, modify_time, after_service_id0, 
> progress_node, progress_node_status, operator, parallel, extend_info], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>       :     :        :        :- 
> Exchange(distribution=[hash[after_service_id]])
>       :     :        :        :  +- Calc(select=[after_service_id, user_id, 
> poi_id, refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time])
>       :     :        :        :     +- DropUpdateBefore
>       :     :        :        :        +- 
> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
>       :     :        :        :           +- 
> TableSourceScan(table=[[default_catalog, default_database, t_name]], 
> fields=[id, after_service_id, order_id, user_id, poi_id, city_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, refund_price, refund_red_packet_price, 
> refund_total_price, refund_promotion_price, refund_coupon_price, 
> refund_other_price, user_receipt_status, collect_status, 
> refund_target_account, status, flow_instance_id, create_time, modify_time])
>       :     :        :        +- 
> Exchange(distribution=[hash[after_service_id]])
>       :     :        :           +- Calc(select=[after_service_id, 
> progress_node, progress_node_status, operator, parallel, extend_info])
>       :     :        :              +- DropUpdateBefore
>       :     :        :                 +- 
> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
>       :     :        :                    +- 
> TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], 
> fields=[id, after_service_id, order_id, progress_node, progress_node_status, 
> operator, parallel, flow_element_id, extend_info, create_time, modify_time])
>       :     :        +- Exchange(distribution=[hash[after_service_id]])
>       :     :           +- Calc(select=[after_service_id, name, value])
>       :     :              +- DropUpdateBefore
>       :     :                 +- MiniBatchAssigner(interval=[5000ms], 
> mode=[ProcTime])
>       :     :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, t_attr_name]], fields=[id, after_service_id, order_id, 
> name, value, create_time, modify_time])
>       :     +- Exchange(distribution=[hash[after_service_id]])
>       :        +- Calc(select=[after_service_id, apply_status])
>       :           +- DropUpdateBefore
>       :              +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
>       :                 +- TableSourceScan(table=[[default_catalog, 
> default_database, t_apply_status_name]], fields=[id, after_service_id, 
> order_id, apply_status, ai_audit_status, group_header_confirm_status, 
> group_header_retrieve_status, driver_retrieve_status, 
> group_header_parallel_status, grid_parallel_status, create_time, modify_time])
>       +- Exchange(distribution=[hash[refund_apply_id]])
>          +- Calc(select=[refund_apply_id, order_id, base_sku_id, 
> apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price])
>             +- GlobalGroupAggregate(groupBy=[refund_apply_id, order_id, 
> base_sku_id], select=[refund_apply_id, order_id, base_sku_id, 
> SUM_RETRACT((sum$0, count$1)) AS apply_refund_num, SUM_RETRACT((sum$2, 
> count$3)) AS view_qty, SUM_RETRACT((sum$4, count$5)) AS 
> refund_scale_user_real_pay, SUM_RETRACT((sum$6, count$7)) AS 
> refund_red_packet_price])
>                +- Exchange(distribution=[hash[refund_apply_id, order_id, 
> base_sku_id]])
>                   +- LocalGroupAggregate(groupBy=[refund_apply_id, order_id, 
> base_sku_id], select=[refund_apply_id, order_id, base_sku_id, 
> SUM_RETRACT($f3) AS (sum$0, count$1), SUM_RETRACT($f4) AS (sum$2, count$3), 
> SUM_RETRACT($f5) AS (sum$4, count$5), SUM_RETRACT($f6) AS (sum$6, count$7), 
> COUNT_RETRACT(*) AS count1$8])
>                      +- Calc(select=[after_service_id AS refund_apply_id, 
> order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF 
> refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF 
> CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS 
> $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS 
> $f6])
>                         +- MiniBatchAssigner(interval=[5000ms], 
> mode=[ProcTime])
>                            +- TableSourceScan(table=[[default_catalog, 
> default_database, t_item_name]], fields=[id, refund_fwd_item_id, 
> after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, 
> supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, 
> accurate_refund, refund_price, refund_red_packet_price, refund_price_info, 
> refund_total_price, refund_promotion_price, refund_coupon_price, 
> refund_other_price, extend_info, create_time, modify_time]) {code}
> when set table.exec.mini-batch.enabled=false
> == Optimized Logical Plan ==
> {code:java}
> Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, 
> refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, 
> apply_refund_reason_code, apply_refund_reason_desc, 
> apply_refund_review_status, apply_refund_review_status_desc, 
> apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, 
> refund_apply_originator, second_reason_code, second_reason, 
> refund_target_account, after_service_id, receipt_status, 
> group_header_goods_status, apply_operator_mis_name, refund_apply_time, 
> update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, 
> refund_scale_type_desc, refund_scale, apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price, load_time, 
> take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, 
> second_reason_aggregated, compensation_amount, aftersale_type, 
> group_header_parallel_status, grid_parallel_status])
> +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME 
> _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, 
> poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS 
> apply_refund_reason_code, first_reason AS apply_refund_reason_desc, 
> CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH 
> Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE 
> (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE 
> _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 
> 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE 
> _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = 
> 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, 
> CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS 
> apply_pic_url, remark, null:BIGINT AS refund_apply_originator, 
> CAST(second_reason_code) AS second_reason_code, second_reason, 
> CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) 
> AS after_service_id, CAST(user_receipt_status) AS receipt_status, 
> CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, 
> operator AS apply_operator_mis_name, create_time AS refund_apply_time, 
> modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) 
> AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS 
> refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, 
> CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, 
> CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS 
> load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS 
> platform_rate, 1 AS order_sku_type, null:INTEGER AS 
> second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS 
> compensation_amount, 1 AS aftersale_type, CAST((((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE 
> ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) 
> CASE 2 CASE ((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE 
> ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) 
> CASE 4 CASE ((progress_node = 
> _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 
> 0)) AS group_header_parallel_status, CAST((((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = 
> _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS 
> grid_parallel_status])
>    +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = 
> refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, 
> first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
> remark, user_receipt_status, refund_target_account, create_time, modify_time, 
> progress_node, progress_node_status, operator, parallel, extend_info, name, 
> value, after_service_id0, apply_status, refund_apply_id, order_id, 
> base_sku_id, apply_refund_num, view_qty, apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price], 
> leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
>       :- Exchange(distribution=[hash[after_service_id0]])
>       :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
> after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
> first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
> remark, user_receipt_status, refund_target_account, create_time, modify_time, 
> progress_node, progress_node_status, operator, parallel, extend_info, name, 
> value, after_service_id0, apply_status], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>       :     :- Exchange(distribution=[hash[after_service_id]])
>       :     :  +- Calc(select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info, name, value])
>       :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id 
> = after_service_id0)], select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info, after_service_id0, name, value], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>       :     :        :- Exchange(distribution=[hash[after_service_id]])
>       :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time, progress_node, progress_node_status, operator, 
> parallel, extend_info])
>       :     :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[(after_service_id = after_service_id0)], select=[after_service_id, 
> user_id, poi_id, refund_type, first_reason_code, first_reason, 
> second_reason_code, second_reason, pic_url, remark, user_receipt_status, 
> refund_target_account, create_time, modify_time, after_service_id0, 
> progress_node, progress_node_status, operator, parallel, extend_info], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>       :     :        :        :- 
> Exchange(distribution=[hash[after_service_id]])
>       :     :        :        :  +- Calc(select=[after_service_id, user_id, 
> poi_id, refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
> create_time, modify_time])
>       :     :        :        :     +- DropUpdateBefore
>       :     :        :        :        +- 
> TableSourceScan(table=[[default_catalog, default_database, t_name]], 
> fields=[id, after_service_id, order_id, user_id, poi_id, city_id, 
> refund_type, first_reason_code, first_reason, second_reason_code, 
> second_reason, pic_url, remark, refund_price, refund_red_packet_price, 
> refund_total_price, refund_promotion_price, refund_coupon_price, 
> refund_other_price, user_receipt_status, collect_status, 
> refund_target_account, status, flow_instance_id, create_time, modify_time])
>       :     :        :        +- 
> Exchange(distribution=[hash[after_service_id]])
>       :     :        :           +- Calc(select=[after_service_id, 
> progress_node, progress_node_status, operator, parallel, extend_info])
>       :     :        :              +- DropUpdateBefore
>       :     :        :                 +- 
> TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], 
> fields=[id, after_service_id, order_id, progress_node, progress_node_status, 
> operator, parallel, flow_element_id, extend_info, create_time, modify_time])
>       :     :        +- Exchange(distribution=[hash[after_service_id]])
>       :     :           +- Calc(select=[after_service_id, name, value])
>       :     :              +- DropUpdateBefore
>       :     :                 +- TableSourceScan(table=[[default_catalog, 
> default_database, t_attr_name]], fields=[id, after_service_id, order_id, 
> name, value, create_time, modify_time])
>       :     +- Exchange(distribution=[hash[after_service_id]])
>       :        +- Calc(select=[after_service_id, apply_status])
>       :           +- DropUpdateBefore
>       :              +- TableSourceScan(table=[[default_catalog, 
> default_database, t_apply_status_name]], fields=[id, after_service_id, 
> order_id, apply_status, ai_audit_status, group_header_confirm_status, 
> group_header_retrieve_status, driver_retrieve_status, 
> group_header_parallel_status, grid_parallel_status, create_time, modify_time])
>       +- Exchange(distribution=[hash[refund_apply_id]])
>          +- Calc(select=[refund_apply_id, order_id, base_sku_id, 
> apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, 
> refund_scale_user_real_pay, refund_red_packet_price])
>             +- GroupAggregate(groupBy=[refund_apply_id, order_id, 
> base_sku_id], select=[refund_apply_id, order_id, base_sku_id, 
> SUM_RETRACT($f3) AS apply_refund_num, SUM_RETRACT($f4) AS view_qty, 
> SUM_RETRACT($f5) AS refund_scale_user_real_pay, SUM_RETRACT($f6) AS 
> refund_red_packet_price])
>                +- Exchange(distribution=[hash[refund_apply_id, order_id, 
> base_sku_id]])
>                   +- Calc(select=[after_service_id AS refund_apply_id, 
> order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF 
> refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF 
> CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS 
> $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS 
> $f6])
>                      +- TableSourceScan(table=[[default_catalog, 
> default_database, t_item_name]], fields=[id, refund_fwd_item_id, 
> after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, 
> supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, 
> accurate_refund, refund_price, refund_red_packet_price, refund_price_info, 
> refund_total_price, refund_promotion_price, refund_coupon_price, 
> refund_other_price, extend_info, create_time, modify_time]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to