zhangbin created FLINK-27922:
--------------------------------

             Summary: Flink SQL unique key lost when set 
table.exec.mini-batch.enabled=true
                 Key: FLINK-27922
                 URL: https://issues.apache.org/jira/browse/FLINK-27922
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.2
         Environment: Flink1.12.2
            Reporter: zhangbin


Flink SQL table has primary keys, but when set table.exec.mini-batch.enabled 
=true, the unique key is lost.
{code:java}
@Test
public void testJoinUniqueKey() throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);    
Configuration configuration = tableEnv.getConfig().getConfiguration();
    configuration.setString("table.exec.mini-batch.enabled", "true");
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");    
StatementSet statementSet = tableEnv.createStatementSet();
    tableEnv.executeSql("CREATE TABLE `t_apply_sku_test`(`dt` 
BIGINT,`refund_apply_id` BIGINT,`base_sku_id` BIGINT,`order_id` 
BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`refund_type` 
BIGINT,`apply_refund_reason_code` BIGINT,`apply_refund_reason_desc` 
VARCHAR,`apply_refund_review_status` BIGINT,`apply_refund_review_status_desc` 
VARCHAR,`apply_refund_reject_reason` VARCHAR,`apply_is_refunded` 
INTEGER,`apply_pic_url` VARCHAR,`remark` VARCHAR,`refund_apply_originator` 
BIGINT,`second_reason_code` BIGINT,`second_reason` 
VARCHAR,`refund_target_account` BIGINT,`after_service_id` 
BIGINT,`receipt_status` BIGINT,`group_header_goods_status` 
INTEGER,`apply_operator_mis_name` VARCHAR,`refund_apply_time` 
BIGINT,`update_time` BIGINT,`base_sku_name` VARCHAR,`apply_refund_num` 
BIGINT,`view_qty` DECIMAL(38,18),`refund_scale_type` 
BIGINT,`refund_scale_type_desc` VARCHAR,`refund_scale` 
DECIMAL(38,18),`apply_refund_amt` DECIMAL(38,18),`refund_scale_user_real_pay` 
DECIMAL(38,18),`refund_red_packet_price` DECIMAL(38,18),`load_time` 
VARCHAR,`take_rate_type` BIGINT,`platform_rate` DECIMAL(38,18),`order_sku_type` 
INTEGER,`second_reason_aggregated_code` INTEGER,`second_reason_aggregated` 
VARCHAR,`compensation_amount` DECIMAL(38,18),`aftersale_type` 
INTEGER,`group_header_parallel_status` INTEGER,`grid_parallel_status` INTEGER) 
WITH ('connector'='blackhole')");
    tableEnv.executeSql("CREATE TABLE `t_name`(`id` BIGINT,`after_service_id` 
BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`user_id` BIGINT,`poi_id` 
BIGINT,`city_id` BIGINT,`refund_type` INTEGER,`first_reason_code` 
INTEGER,`first_reason` VARCHAR,`second_reason_code` INTEGER,`second_reason` 
VARCHAR,`pic_url` VARCHAR,`remark` VARCHAR,`refund_price` 
INTEGER,`refund_red_packet_price` INTEGER,`refund_total_price` 
INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` 
INTEGER,`refund_other_price` INTEGER,`user_receipt_status` 
INTEGER,`collect_status` INTEGER,`refund_target_account` INTEGER,`status` 
INTEGER,`flow_instance_id` BIGINT,`create_time` BIGINT,`modify_time` BIGINT) 
WITH ('connector'='datagen')");
    tableEnv.executeSql("CREATE TABLE `t_item_name`(`id` 
BIGINT,`refund_fwd_item_id` BIGINT,`after_service_id` BIGINT,`order_id` 
BIGINT,`order_item_id` BIGINT,`stack_id` BIGINT,`sku_id` BIGINT,`sku_name` 
VARCHAR,`supplier_id` BIGINT,`refund_quantity` INTEGER,`item_sku_type` 
INTEGER,`refund_scale_type` INTEGER,`refund_scale` INTEGER,`accurate_refund` 
INTEGER,`refund_price` INTEGER,`refund_red_packet_price` 
INTEGER,`refund_price_info` VARCHAR,`refund_total_price` 
INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` 
INTEGER,`refund_other_price` INTEGER,`extend_info` VARCHAR,`create_time` 
BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
    tableEnv.executeSql("CREATE TABLE `t_progress_name`(`id` 
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
BIGINT,`progress_node` VARCHAR,`progress_node_status` INTEGER,`operator` 
VARCHAR,`parallel` INTEGER,`flow_element_id` VARCHAR,`extend_info` 
VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH 
('connector'='datagen')");
    tableEnv.executeSql("CREATE TABLE `t_attr_name`(`id` 
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
BIGINT,`name` VARCHAR,`value` VARCHAR,`create_time` BIGINT,`modify_time` 
BIGINT) WITH ('connector'='datagen')");
    tableEnv.executeSql("CREATE TABLE `t_apply_status_name`(`id` 
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` 
BIGINT,`apply_status` INTEGER,`ai_audit_status` 
INTEGER,`group_header_confirm_status` INTEGER,`group_header_retrieve_status` 
INTEGER,`driver_retrieve_status` INTEGER,`group_header_parallel_status` 
INTEGER,`grid_parallel_status` INTEGER,`create_time` BIGINT,`modify_time` 
BIGINT) WITH ('connector'='datagen')");
    tableEnv.executeSql("CREATE VIEW `v_refund_fwd_item_sku` AS SELECT 
`after_service_id` AS `refund_apply_id`, `order_id`, `sku_id` AS `base_sku_id`, 
SUM(`if`(`refund_quantity` IS NULL, 0, `refund_quantity`)) AS 
`apply_refund_num`, SUM(`if`(`refund_quantity` IS NULL, 0, 
CAST(`refund_quantity` AS DOUBLE))) AS `view_qty`, SUM(`if`(`refund_price` IS 
NULL, 0, `refund_price`)) AS `apply_refund_amt`, SUM(`if`(`refund_price` IS 
NULL, 0, `refund_price`)) AS `refund_scale_user_real_pay`, 
SUM(`if`(`refund_red_packet_price` IS NULL, 0, `refund_red_packet_price`)) AS 
`refund_red_packet_price`\n"
            + "FROM `t_item_name`\n"
            + "GROUP BY `after_service_id`, `order_id`, `sku_id`");    
statementSet.addInsertSql("INSERT INTO `t_apply_sku_test` SELECT 
CAST(`FROM_UNIXTIME`(`ord`.`modify_time` / 1000, 'yyyyMMdd') AS BIGINT) AS 
`dt`, `sku`.`refund_apply_id`,`sku`.`base_sku_id`, `sku`.`order_id`, 
`ord`.`user_id`, `ord`.`poi_id`, CAST(`ord`.`refund_type` AS BIGINT) AS 
`refund_type`, CAST(`ord`.`first_reason_code` AS BIGINT) AS 
`apply_refund_reason_code`, `ord`.`first_reason` AS `apply_refund_reason_desc`, 
CAST(`stat`.`apply_status` AS BIGINT) AS `apply_refund_review_status`, CASE 
WHEN `stat`.`apply_status` = 0 OR `stat`.`apply_status` = 13 THEN 'a' WHEN 
`stat`.`apply_status` = 1 THEN 'b' WHEN `stat`.`apply_status` = 2 THEN 'c' WHEN 
`stat`.`apply_status` = 3 THEN 'd' WHEN `stat`.`apply_status` = 4 THEN 'e' WHEN 
`stat`.`apply_status` = 5 THEN 'f' WHEN `stat`.`apply_status` = 6 THEN 'g' ELSE 
'x' END AS `apply_refund_review_status_desc`, 
`if`(`progress`.`progress_node_status` = 30, `extend_info`, '') AS 
`apply_refund_reject_reason`, CASE WHEN `progress`.`progress_node` = 
'refund.node' THEN 1 ELSE 0 END AS `apply_is_refunded`, `ord`.`pic_url` AS 
`apply_pic_url`, `ord`.`remark`, CAST(NULL AS BIGINT) AS 
`refund_apply_originator`, CAST(`ord`.`second_reason_code` AS BIGINT) AS 
`second_reason_code`, `ord`.`second_reason`, CAST(`ord`.`refund_target_account` 
AS BIGINT) AS `refund_target_account`, `ord`.`after_service_id`, 
CAST(`ord`.`user_receipt_status` AS BIGINT) AS `receipt_status`, CASE WHEN 
`attr`.`name` = 'fwd_ext' AND `attr`.`value` = '1' THEN 1 WHEN `attr`.`name` = 
'fwd_ext' AND `attr`.`value` = '2' THEN 2 ELSE 0 END AS 
`group_header_goods_status`, `progress`.`operator` AS 
`apply_operator_mis_name`, `ord`.`create_time` AS `refund_apply_time`, 
`ord`.`modify_time` AS `update_time`, CAST(NULL AS VARCHAR) AS `base_sku_name`, 
CAST(`sku`.`apply_refund_num` AS BIGINT) AS `apply_refund_num`, 
`sku`.`view_qty` AS `view_qty`, CAST(NULL AS BIGINT) AS `refund_scale_type`, 
CAST(NULL AS VARCHAR) AS `refund_scale_type_desc`, CAST(NULL AS DECIMAL) AS 
`refund_scale`, `sku`.`apply_refund_amt` AS `apply_refund_amt`, 
`sku`.`refund_scale_user_real_pay` AS `refund_scale_user_real_pay`, 
`sku`.`refund_red_packet_price` AS `refund_red_packet_price`, 
CAST(LOCALTIMESTAMP AS VARCHAR) AS `load_time`, CAST(NULL AS BIGINT) AS 
`take_rate_type`, CAST(NULL AS DECIMAL) AS `platform_rate`, 1 AS 
`order_sku_type`, CAST(NULL AS INTEGER) AS `second_reason_aggregated_code`, 
CAST(NULL AS VARCHAR) AS `second_reason_aggregated`, CAST(NULL AS DECIMAL) AS 
`compensation_amount`, 1 AS `aftersale_type`, CASE WHEN 
`progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND 
`progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` = 
'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20 
THEN 2 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND 
`parallel` = 1 AND `progress_node_status` = 30 THEN 3 WHEN 
`progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND 
`progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` = 
'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50 
THEN 5 ELSE 0 END AS `group_header_parallel_status`, CASE WHEN 
`progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND 
`progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` = 
'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20 THEN 2 
WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND 
`progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` = 
'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 THEN 4 
WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND 
`progress_node_status` = 50 THEN 5 ELSE 0 END AS `grid_parallel_status`\n"
            + "FROM `t_name` AS `ord`\n"
            + "LEFT JOIN `t_progress_name` AS `progress` ON 
`ord`.`after_service_id` = `progress`.`after_service_id`\n"
            + "LEFT JOIN `t_attr_name` AS `attr` ON `ord`.`after_service_id` = 
`attr`.`after_service_id`\n"
            + "LEFT JOIN `t_apply_status_name` AS `stat` ON 
`ord`.`after_service_id` = `stat`.`after_service_id`\n"
            + "LEFT JOIN `v_refund_fwd_item_sku` AS `sku` ON 
`stat`.`after_service_id` = `sku`.`refund_apply_id`");    
System.out.println(statementSet.explain());
}{code}
== Optimized Logical Plan ==
{code:java}
Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, 
refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, 
apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status, 
apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded, 
apply_pic_url, remark, refund_apply_originator, second_reason_code, 
second_reason, refund_target_account, after_service_id, receipt_status, 
group_header_goods_status, apply_operator_mis_name, refund_apply_time, 
update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, 
refund_scale_type_desc, refund_scale, apply_refund_amt, 
refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type, 
platform_rate, order_sku_type, second_reason_aggregated_code, 
second_reason_aggregated, compensation_amount, aftersale_type, 
group_header_parallel_status, grid_parallel_status])
+- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd')) 
AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, 
CAST(refund_type) AS refund_type, CAST(first_reason_code) AS 
apply_refund_reason_code, first_reason AS apply_refund_reason_desc, 
CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH 
Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE 
(apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd' 
CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE 
_UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS 
apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF 
_UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node = 
_UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1 
CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS 
refund_apply_originator, CAST(second_reason_code) AS second_reason_code, 
second_reason, CAST(refund_target_account) AS refund_target_account, 
CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS 
receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, 
operator AS apply_operator_mis_name, create_time AS refund_apply_time, 
modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS 
view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale, 
CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS 
refund_scale_user_real_pay, CAST(refund_red_packet_price) AS 
refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS 
take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type, 
null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS 
compensation_amount, 1 AS aftersale_type, CAST((((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE 
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) 
CASE 2 CASE ((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE 
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) 
CASE 4 CASE ((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) 
AS group_header_parallel_status, CAST((((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS 
grid_parallel_status])
   +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = 
refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id, 
apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay, 
refund_red_packet_price], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[HasUniqueKey])
      :- Exchange(distribution=[hash[after_service_id0]])
      :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value, after_service_id0, apply_status], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
      :     :- Exchange(distribution=[hash[after_service_id]])
      :     :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value])
      :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, 
after_service_id0, name, value], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
      :     :        :- Exchange(distribution=[hash[after_service_id]])
      :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, 
refund_type, first_reason_code, first_reason, second_reason_code, 
second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
create_time, modify_time, progress_node, progress_node_status, operator, 
parallel, extend_info])
      :     :        :     +- Join(joinType=[LeftOuterJoin], 
where=[(after_service_id = after_service_id0)], select=[after_service_id, 
user_id, poi_id, refund_type, first_reason_code, first_reason, 
second_reason_code, second_reason, pic_url, remark, user_receipt_status, 
refund_target_account, create_time, modify_time, after_service_id0, 
progress_node, progress_node_status, operator, parallel, extend_info], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :     :        :        :- Exchange(distribution=[hash[after_service_id]])
      :     :        :        :  +- Calc(select=[after_service_id, user_id, 
poi_id, refund_type, first_reason_code, first_reason, second_reason_code, 
second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
create_time, modify_time])
      :     :        :        :     +- DropUpdateBefore
      :     :        :        :        +- MiniBatchAssigner(interval=[5000ms], 
mode=[ProcTime])
      :     :        :        :           +- 
TableSourceScan(table=[[default_catalog, default_database, t_name]], 
fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, refund_price, refund_red_packet_price, refund_total_price, 
refund_promotion_price, refund_coupon_price, refund_other_price, 
user_receipt_status, collect_status, refund_target_account, status, 
flow_instance_id, create_time, modify_time])
      :     :        :        +- Exchange(distribution=[hash[after_service_id]])
      :     :        :           +- Calc(select=[after_service_id, 
progress_node, progress_node_status, operator, parallel, extend_info])
      :     :        :              +- DropUpdateBefore
      :     :        :                 +- MiniBatchAssigner(interval=[5000ms], 
mode=[ProcTime])
      :     :        :                    +- 
TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], 
fields=[id, after_service_id, order_id, progress_node, progress_node_status, 
operator, parallel, flow_element_id, extend_info, create_time, modify_time])
      :     :        +- Exchange(distribution=[hash[after_service_id]])
      :     :           +- Calc(select=[after_service_id, name, value])
      :     :              +- DropUpdateBefore
      :     :                 +- MiniBatchAssigner(interval=[5000ms], 
mode=[ProcTime])
      :     :                    +- TableSourceScan(table=[[default_catalog, 
default_database, t_attr_name]], fields=[id, after_service_id, order_id, name, 
value, create_time, modify_time])
      :     +- Exchange(distribution=[hash[after_service_id]])
      :        +- Calc(select=[after_service_id, apply_status])
      :           +- DropUpdateBefore
      :              +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
      :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t_apply_status_name]], fields=[id, after_service_id, 
order_id, apply_status, ai_audit_status, group_header_confirm_status, 
group_header_retrieve_status, driver_retrieve_status, 
group_header_parallel_status, grid_parallel_status, create_time, modify_time])
      +- Exchange(distribution=[hash[refund_apply_id]])
         +- Calc(select=[refund_apply_id, order_id, base_sku_id, 
apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, 
refund_scale_user_real_pay, refund_red_packet_price])
            +- GlobalGroupAggregate(groupBy=[refund_apply_id, order_id, 
base_sku_id], select=[refund_apply_id, order_id, base_sku_id, 
SUM_RETRACT((sum$0, count$1)) AS apply_refund_num, SUM_RETRACT((sum$2, 
count$3)) AS view_qty, SUM_RETRACT((sum$4, count$5)) AS 
refund_scale_user_real_pay, SUM_RETRACT((sum$6, count$7)) AS 
refund_red_packet_price])
               +- Exchange(distribution=[hash[refund_apply_id, order_id, 
base_sku_id]])
                  +- LocalGroupAggregate(groupBy=[refund_apply_id, order_id, 
base_sku_id], select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3) 
AS (sum$0, count$1), SUM_RETRACT($f4) AS (sum$2, count$3), SUM_RETRACT($f5) AS 
(sum$4, count$5), SUM_RETRACT($f6) AS (sum$6, count$7), COUNT_RETRACT(*) AS 
count1$8])
                     +- Calc(select=[after_service_id AS refund_apply_id, 
order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF 
refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF 
CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS 
$f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
                        +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, t_item_name]], fields=[id, refund_fwd_item_id, 
after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, 
supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, 
accurate_refund, refund_price, refund_red_packet_price, refund_price_info, 
refund_total_price, refund_promotion_price, refund_coupon_price, 
refund_other_price, extend_info, create_time, modify_time]) {code}
when set table.exec.mini-batch.enabled=false

== Optimized Logical Plan ==
{code:java}
Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, 
refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, 
apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status, 
apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded, 
apply_pic_url, remark, refund_apply_originator, second_reason_code, 
second_reason, refund_target_account, after_service_id, receipt_status, 
group_header_goods_status, apply_operator_mis_name, refund_apply_time, 
update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, 
refund_scale_type_desc, refund_scale, apply_refund_amt, 
refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type, 
platform_rate, order_sku_type, second_reason_aggregated_code, 
second_reason_aggregated, compensation_amount, aftersale_type, 
group_header_parallel_status, grid_parallel_status])
+- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd')) 
AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, 
CAST(refund_type) AS refund_type, CAST(first_reason_code) AS 
apply_refund_reason_code, first_reason AS apply_refund_reason_desc, 
CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH 
Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE 
(apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd' 
CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE 
_UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS 
apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF 
_UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node = 
_UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1 
CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS 
refund_apply_originator, CAST(second_reason_code) AS second_reason_code, 
second_reason, CAST(refund_target_account) AS refund_target_account, 
CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS 
receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, 
operator AS apply_operator_mis_name, create_time AS refund_apply_time, 
modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS 
view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale, 
CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS 
refund_scale_user_real_pay, CAST(refund_red_packet_price) AS 
refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS 
take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type, 
null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS 
compensation_amount, 1 AS aftersale_type, CAST((((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE 
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) 
CASE 2 CASE ((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE 
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) 
CASE 4 CASE ((progress_node = 
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) 
AS group_header_parallel_status, CAST((((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = 
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS 
grid_parallel_status])
   +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = 
refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id, 
apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay, 
refund_red_packet_price], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[HasUniqueKey])
      :- Exchange(distribution=[hash[after_service_id0]])
      :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value, after_service_id0, apply_status], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :     :- Exchange(distribution=[hash[after_service_id]])
      :     :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, name, 
value])
      :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = 
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, user_receipt_status, refund_target_account, create_time, modify_time, 
progress_node, progress_node_status, operator, parallel, extend_info, 
after_service_id0, name, value], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :     :        :- Exchange(distribution=[hash[after_service_id]])
      :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, 
refund_type, first_reason_code, first_reason, second_reason_code, 
second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
create_time, modify_time, progress_node, progress_node_status, operator, 
parallel, extend_info])
      :     :        :     +- Join(joinType=[LeftOuterJoin], 
where=[(after_service_id = after_service_id0)], select=[after_service_id, 
user_id, poi_id, refund_type, first_reason_code, first_reason, 
second_reason_code, second_reason, pic_url, remark, user_receipt_status, 
refund_target_account, create_time, modify_time, after_service_id0, 
progress_node, progress_node_status, operator, parallel, extend_info], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :     :        :        :- Exchange(distribution=[hash[after_service_id]])
      :     :        :        :  +- Calc(select=[after_service_id, user_id, 
poi_id, refund_type, first_reason_code, first_reason, second_reason_code, 
second_reason, pic_url, remark, user_receipt_status, refund_target_account, 
create_time, modify_time])
      :     :        :        :     +- DropUpdateBefore
      :     :        :        :        +- 
TableSourceScan(table=[[default_catalog, default_database, t_name]], 
fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type, 
first_reason_code, first_reason, second_reason_code, second_reason, pic_url, 
remark, refund_price, refund_red_packet_price, refund_total_price, 
refund_promotion_price, refund_coupon_price, refund_other_price, 
user_receipt_status, collect_status, refund_target_account, status, 
flow_instance_id, create_time, modify_time])
      :     :        :        +- Exchange(distribution=[hash[after_service_id]])
      :     :        :           +- Calc(select=[after_service_id, 
progress_node, progress_node_status, operator, parallel, extend_info])
      :     :        :              +- DropUpdateBefore
      :     :        :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], 
fields=[id, after_service_id, order_id, progress_node, progress_node_status, 
operator, parallel, flow_element_id, extend_info, create_time, modify_time])
      :     :        +- Exchange(distribution=[hash[after_service_id]])
      :     :           +- Calc(select=[after_service_id, name, value])
      :     :              +- DropUpdateBefore
      :     :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t_attr_name]], fields=[id, after_service_id, order_id, name, 
value, create_time, modify_time])
      :     +- Exchange(distribution=[hash[after_service_id]])
      :        +- Calc(select=[after_service_id, apply_status])
      :           +- DropUpdateBefore
      :              +- TableSourceScan(table=[[default_catalog, 
default_database, t_apply_status_name]], fields=[id, after_service_id, 
order_id, apply_status, ai_audit_status, group_header_confirm_status, 
group_header_retrieve_status, driver_retrieve_status, 
group_header_parallel_status, grid_parallel_status, create_time, modify_time])
      +- Exchange(distribution=[hash[refund_apply_id]])
         +- Calc(select=[refund_apply_id, order_id, base_sku_id, 
apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, 
refund_scale_user_real_pay, refund_red_packet_price])
            +- GroupAggregate(groupBy=[refund_apply_id, order_id, base_sku_id], 
select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3) AS 
apply_refund_num, SUM_RETRACT($f4) AS view_qty, SUM_RETRACT($f5) AS 
refund_scale_user_real_pay, SUM_RETRACT($f6) AS refund_red_packet_price])
               +- Exchange(distribution=[hash[refund_apply_id, order_id, 
base_sku_id]])
                  +- Calc(select=[after_service_id AS refund_apply_id, 
order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF 
refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF 
CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS 
$f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
                     +- TableSourceScan(table=[[default_catalog, 
default_database, t_item_name]], fields=[id, refund_fwd_item_id, 
after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, 
supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, 
accurate_refund, refund_price, refund_red_packet_price, refund_price_info, 
refund_total_price, refund_promotion_price, refund_coupon_price, 
refund_other_price, extend_info, create_time, modify_time]) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to