[ https://issues.apache.org/jira/browse/FLINK-27922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-27922: ----------------------------------- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Flink SQL unique key lost when set table.exec.mini-batch.enabled=true > --------------------------------------------------------------------- > > Key: FLINK-27922 > URL: https://issues.apache.org/jira/browse/FLINK-27922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2, 1.15.0 > Environment: Flink1.12.2 > Reporter: zhangbin > Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > Flink SQL table has primary keys, but when set table.exec.mini-batch.enabled > =true, the unique key is lost. > {code:java} > @Test > public void testJoinUniqueKey() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Configuration configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.mini-batch.enabled", "true"); > configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); > configuration.setString("table.exec.mini-batch.size", "5000"); > StatementSet statementSet = tableEnv.createStatementSet(); > tableEnv.executeSql("CREATE TABLE `t_apply_sku_test`(`dt` > BIGINT,`refund_apply_id` BIGINT,`base_sku_id` BIGINT,`order_id` > BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`refund_type` > BIGINT,`apply_refund_reason_code` BIGINT,`apply_refund_reason_desc` > VARCHAR,`apply_refund_review_status` BIGINT,`apply_refund_review_status_desc` > VARCHAR,`apply_refund_reject_reason` VARCHAR,`apply_is_refunded` > INTEGER,`apply_pic_url` VARCHAR,`remark` VARCHAR,`refund_apply_originator` > BIGINT,`second_reason_code` BIGINT,`second_reason` > VARCHAR,`refund_target_account` BIGINT,`after_service_id` > BIGINT,`receipt_status` BIGINT,`group_header_goods_status` > INTEGER,`apply_operator_mis_name` VARCHAR,`refund_apply_time` > BIGINT,`update_time` BIGINT,`base_sku_name` VARCHAR,`apply_refund_num` > BIGINT,`view_qty` DECIMAL(38,18),`refund_scale_type` > BIGINT,`refund_scale_type_desc` VARCHAR,`refund_scale` > DECIMAL(38,18),`apply_refund_amt` DECIMAL(38,18),`refund_scale_user_real_pay` > DECIMAL(38,18),`refund_red_packet_price` DECIMAL(38,18),`load_time` > VARCHAR,`take_rate_type` BIGINT,`platform_rate` > DECIMAL(38,18),`order_sku_type` INTEGER,`second_reason_aggregated_code` > INTEGER,`second_reason_aggregated` VARCHAR,`compensation_amount` > DECIMAL(38,18),`aftersale_type` INTEGER,`group_header_parallel_status` > INTEGER,`grid_parallel_status` INTEGER) WITH ('connector'='blackhole')"); > tableEnv.executeSql("CREATE TABLE `t_name`(`id` BIGINT,`after_service_id` > BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`user_id` BIGINT,`poi_id` > BIGINT,`city_id` BIGINT,`refund_type` INTEGER,`first_reason_code` > INTEGER,`first_reason` VARCHAR,`second_reason_code` INTEGER,`second_reason` > VARCHAR,`pic_url` VARCHAR,`remark` VARCHAR,`refund_price` > INTEGER,`refund_red_packet_price` INTEGER,`refund_total_price` > INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` > INTEGER,`refund_other_price` INTEGER,`user_receipt_status` > INTEGER,`collect_status` INTEGER,`refund_target_account` INTEGER,`status` > INTEGER,`flow_instance_id` BIGINT,`create_time` BIGINT,`modify_time` BIGINT) > WITH ('connector'='datagen')"); > tableEnv.executeSql("CREATE TABLE `t_item_name`(`id` > BIGINT,`refund_fwd_item_id` BIGINT,`after_service_id` BIGINT,`order_id` > BIGINT,`order_item_id` BIGINT,`stack_id` BIGINT,`sku_id` BIGINT,`sku_name` > VARCHAR,`supplier_id` BIGINT,`refund_quantity` INTEGER,`item_sku_type` > INTEGER,`refund_scale_type` INTEGER,`refund_scale` INTEGER,`accurate_refund` > INTEGER,`refund_price` INTEGER,`refund_red_packet_price` > INTEGER,`refund_price_info` VARCHAR,`refund_total_price` > INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` > INTEGER,`refund_other_price` INTEGER,`extend_info` VARCHAR,`create_time` > BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')"); > tableEnv.executeSql("CREATE TABLE `t_progress_name`(`id` > BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` > BIGINT,`progress_node` VARCHAR,`progress_node_status` INTEGER,`operator` > VARCHAR,`parallel` INTEGER,`flow_element_id` VARCHAR,`extend_info` > VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH > ('connector'='datagen')"); > tableEnv.executeSql("CREATE TABLE `t_attr_name`(`id` > BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` > BIGINT,`name` VARCHAR,`value` VARCHAR,`create_time` BIGINT,`modify_time` > BIGINT) WITH ('connector'='datagen')"); > tableEnv.executeSql("CREATE TABLE `t_apply_status_name`(`id` > BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` > BIGINT,`apply_status` INTEGER,`ai_audit_status` > INTEGER,`group_header_confirm_status` INTEGER,`group_header_retrieve_status` > INTEGER,`driver_retrieve_status` INTEGER,`group_header_parallel_status` > INTEGER,`grid_parallel_status` INTEGER,`create_time` BIGINT,`modify_time` > BIGINT) WITH ('connector'='datagen')"); > tableEnv.executeSql("CREATE VIEW `v_refund_fwd_item_sku` AS SELECT > `after_service_id` AS `refund_apply_id`, `order_id`, `sku_id` AS > `base_sku_id`, SUM(`if`(`refund_quantity` IS NULL, 0, `refund_quantity`)) AS > `apply_refund_num`, SUM(`if`(`refund_quantity` IS NULL, 0, > CAST(`refund_quantity` AS DOUBLE))) AS `view_qty`, SUM(`if`(`refund_price` IS > NULL, 0, `refund_price`)) AS `apply_refund_amt`, SUM(`if`(`refund_price` IS > NULL, 0, `refund_price`)) AS `refund_scale_user_real_pay`, > SUM(`if`(`refund_red_packet_price` IS NULL, 0, `refund_red_packet_price`)) AS > `refund_red_packet_price`\n" > + "FROM `t_item_name`\n" > + "GROUP BY `after_service_id`, `order_id`, `sku_id`"); > statementSet.addInsertSql("INSERT INTO `t_apply_sku_test` SELECT > CAST(`FROM_UNIXTIME`(`ord`.`modify_time` / 1000, 'yyyyMMdd') AS BIGINT) AS > `dt`, `sku`.`refund_apply_id`,`sku`.`base_sku_id`, `sku`.`order_id`, > `ord`.`user_id`, `ord`.`poi_id`, CAST(`ord`.`refund_type` AS BIGINT) AS > `refund_type`, CAST(`ord`.`first_reason_code` AS BIGINT) AS > `apply_refund_reason_code`, `ord`.`first_reason` AS > `apply_refund_reason_desc`, CAST(`stat`.`apply_status` AS BIGINT) AS > `apply_refund_review_status`, CASE WHEN `stat`.`apply_status` = 0 OR > `stat`.`apply_status` = 13 THEN 'a' WHEN `stat`.`apply_status` = 1 THEN 'b' > WHEN `stat`.`apply_status` = 2 THEN 'c' WHEN `stat`.`apply_status` = 3 THEN > 'd' WHEN `stat`.`apply_status` = 4 THEN 'e' WHEN `stat`.`apply_status` = 5 > THEN 'f' WHEN `stat`.`apply_status` = 6 THEN 'g' ELSE 'x' END AS > `apply_refund_review_status_desc`, `if`(`progress`.`progress_node_status` = > 30, `extend_info`, '') AS `apply_refund_reject_reason`, CASE WHEN > `progress`.`progress_node` = 'refund.node' THEN 1 ELSE 0 END AS > `apply_is_refunded`, `ord`.`pic_url` AS `apply_pic_url`, `ord`.`remark`, > CAST(NULL AS BIGINT) AS `refund_apply_originator`, > CAST(`ord`.`second_reason_code` AS BIGINT) AS `second_reason_code`, > `ord`.`second_reason`, CAST(`ord`.`refund_target_account` AS BIGINT) AS > `refund_target_account`, `ord`.`after_service_id`, > CAST(`ord`.`user_receipt_status` AS BIGINT) AS `receipt_status`, CASE WHEN > `attr`.`name` = 'fwd_ext' AND `attr`.`value` = '1' THEN 1 WHEN `attr`.`name` > = 'fwd_ext' AND `attr`.`value` = '2' THEN 2 ELSE 0 END AS > `group_header_goods_status`, `progress`.`operator` AS > `apply_operator_mis_name`, `ord`.`create_time` AS `refund_apply_time`, > `ord`.`modify_time` AS `update_time`, CAST(NULL AS VARCHAR) AS > `base_sku_name`, CAST(`sku`.`apply_refund_num` AS BIGINT) AS > `apply_refund_num`, `sku`.`view_qty` AS `view_qty`, CAST(NULL AS BIGINT) AS > `refund_scale_type`, CAST(NULL AS VARCHAR) AS `refund_scale_type_desc`, > CAST(NULL AS DECIMAL) AS `refund_scale`, `sku`.`apply_refund_amt` AS > `apply_refund_amt`, `sku`.`refund_scale_user_real_pay` AS > `refund_scale_user_real_pay`, `sku`.`refund_red_packet_price` AS > `refund_red_packet_price`, CAST(LOCALTIMESTAMP AS VARCHAR) AS `load_time`, > CAST(NULL AS BIGINT) AS `take_rate_type`, CAST(NULL AS DECIMAL) AS > `platform_rate`, 1 AS `order_sku_type`, CAST(NULL AS INTEGER) AS > `second_reason_aggregated_code`, CAST(NULL AS VARCHAR) AS > `second_reason_aggregated`, CAST(NULL AS DECIMAL) AS `compensation_amount`, 1 > AS `aftersale_type`, CASE WHEN `progress`.`progress_node` = > 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 > THEN 1 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND > `parallel` = 1 AND `progress_node_status` = 20 THEN 2 WHEN > `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND > `progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` = > 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 > THEN 4 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND > `parallel` = 1 AND `progress_node_status` = 50 THEN 5 ELSE 0 END AS > `group_header_parallel_status`, CASE WHEN `progress`.`progress_node` = > 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 THEN 1 > WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND > `progress_node_status` = 20 THEN 2 WHEN `progress`.`progress_node` = > 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 30 THEN 3 > WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND > `progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` = > 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50 THEN 5 > ELSE 0 END AS `grid_parallel_status`\n" > + "FROM `t_name` AS `ord`\n" > + "LEFT JOIN `t_progress_name` AS `progress` ON > `ord`.`after_service_id` = `progress`.`after_service_id`\n" > + "LEFT JOIN `t_attr_name` AS `attr` ON `ord`.`after_service_id` > = `attr`.`after_service_id`\n" > + "LEFT JOIN `t_apply_status_name` AS `stat` ON > `ord`.`after_service_id` = `stat`.`after_service_id`\n" > + "LEFT JOIN `v_refund_fwd_item_sku` AS `sku` ON > `stat`.`after_service_id` = `sku`.`refund_apply_id`"); > System.out.println(statementSet.explain()); > }{code} > == Optimized Logical Plan == > {code:java} > Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, > refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, > apply_refund_reason_code, apply_refund_reason_desc, > apply_refund_review_status, apply_refund_review_status_desc, > apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, > refund_apply_originator, second_reason_code, second_reason, > refund_target_account, after_service_id, receipt_status, > group_header_goods_status, apply_operator_mis_name, refund_apply_time, > update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, > refund_scale_type_desc, refund_scale, apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price, load_time, > take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, > second_reason_aggregated, compensation_amount, aftersale_type, > group_header_parallel_status, grid_parallel_status]) > +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME > _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, > poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS > apply_refund_reason_code, first_reason AS apply_refund_reason_desc, > CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH > Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE > (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE > _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = > 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE > _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = > 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, > CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER > SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS > apply_pic_url, remark, null:BIGINT AS refund_apply_originator, > CAST(second_reason_code) AS second_reason_code, second_reason, > CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) > AS after_service_id, CAST(user_receipt_status) AS receipt_status, > CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, > operator AS apply_operator_mis_name, create_time AS refund_apply_time, > modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) > AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS > refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, > CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, > CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS > load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS > platform_rate, 1 AS order_sku_type, null:INTEGER AS > second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS > compensation_amount, 1 AS aftersale_type, CAST((((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE > ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) > CASE 2 CASE ((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE > ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) > CASE 4 CASE ((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE > 0)) AS group_header_parallel_status, CAST((((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS > grid_parallel_status]) > +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = > refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, > first_reason_code, first_reason, second_reason_code, second_reason, pic_url, > remark, user_receipt_status, refund_target_account, create_time, modify_time, > progress_node, progress_node_status, operator, parallel, extend_info, name, > value, after_service_id0, apply_status, refund_apply_id, order_id, > base_sku_id, apply_refund_num, view_qty, apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price], > leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey]) > :- Exchange(distribution=[hash[after_service_id0]]) > : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = > after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, > first_reason_code, first_reason, second_reason_code, second_reason, pic_url, > remark, user_receipt_status, refund_target_account, create_time, modify_time, > progress_node, progress_node_status, operator, parallel, extend_info, name, > value, after_service_id0, apply_status], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) > : :- Exchange(distribution=[hash[after_service_id]]) > : : +- Calc(select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info, name, value]) > : : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id > = after_service_id0)], select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info, after_service_id0, name, value], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > : : :- Exchange(distribution=[hash[after_service_id]]) > : : : +- Calc(select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info]) > : : : +- Join(joinType=[LeftOuterJoin], > where=[(after_service_id = after_service_id0)], select=[after_service_id, > user_id, poi_id, refund_type, first_reason_code, first_reason, > second_reason_code, second_reason, pic_url, remark, user_receipt_status, > refund_target_account, create_time, modify_time, after_service_id0, > progress_node, progress_node_status, operator, parallel, extend_info], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > : : : :- > Exchange(distribution=[hash[after_service_id]]) > : : : : +- Calc(select=[after_service_id, user_id, > poi_id, refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time]) > : : : : +- DropUpdateBefore > : : : : +- > MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) > : : : : +- > TableSourceScan(table=[[default_catalog, default_database, t_name]], > fields=[id, after_service_id, order_id, user_id, poi_id, city_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, refund_price, refund_red_packet_price, > refund_total_price, refund_promotion_price, refund_coupon_price, > refund_other_price, user_receipt_status, collect_status, > refund_target_account, status, flow_instance_id, create_time, modify_time]) > : : : +- > Exchange(distribution=[hash[after_service_id]]) > : : : +- Calc(select=[after_service_id, > progress_node, progress_node_status, operator, parallel, extend_info]) > : : : +- DropUpdateBefore > : : : +- > MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) > : : : +- > TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], > fields=[id, after_service_id, order_id, progress_node, progress_node_status, > operator, parallel, flow_element_id, extend_info, create_time, modify_time]) > : : +- Exchange(distribution=[hash[after_service_id]]) > : : +- Calc(select=[after_service_id, name, value]) > : : +- DropUpdateBefore > : : +- MiniBatchAssigner(interval=[5000ms], > mode=[ProcTime]) > : : +- TableSourceScan(table=[[default_catalog, > default_database, t_attr_name]], fields=[id, after_service_id, order_id, > name, value, create_time, modify_time]) > : +- Exchange(distribution=[hash[after_service_id]]) > : +- Calc(select=[after_service_id, apply_status]) > : +- DropUpdateBefore > : +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) > : +- TableSourceScan(table=[[default_catalog, > default_database, t_apply_status_name]], fields=[id, after_service_id, > order_id, apply_status, ai_audit_status, group_header_confirm_status, > group_header_retrieve_status, driver_retrieve_status, > group_header_parallel_status, grid_parallel_status, create_time, modify_time]) > +- Exchange(distribution=[hash[refund_apply_id]]) > +- Calc(select=[refund_apply_id, order_id, base_sku_id, > apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price]) > +- GlobalGroupAggregate(groupBy=[refund_apply_id, order_id, > base_sku_id], select=[refund_apply_id, order_id, base_sku_id, > SUM_RETRACT((sum$0, count$1)) AS apply_refund_num, SUM_RETRACT((sum$2, > count$3)) AS view_qty, SUM_RETRACT((sum$4, count$5)) AS > refund_scale_user_real_pay, SUM_RETRACT((sum$6, count$7)) AS > refund_red_packet_price]) > +- Exchange(distribution=[hash[refund_apply_id, order_id, > base_sku_id]]) > +- LocalGroupAggregate(groupBy=[refund_apply_id, order_id, > base_sku_id], select=[refund_apply_id, order_id, base_sku_id, > SUM_RETRACT($f3) AS (sum$0, count$1), SUM_RETRACT($f4) AS (sum$2, count$3), > SUM_RETRACT($f5) AS (sum$4, count$5), SUM_RETRACT($f6) AS (sum$6, count$7), > COUNT_RETRACT(*) AS count1$8]) > +- Calc(select=[after_service_id AS refund_apply_id, > order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF > refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF > CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS > $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS > $f6]) > +- MiniBatchAssigner(interval=[5000ms], > mode=[ProcTime]) > +- TableSourceScan(table=[[default_catalog, > default_database, t_item_name]], fields=[id, refund_fwd_item_id, > after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, > supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, > accurate_refund, refund_price, refund_red_packet_price, refund_price_info, > refund_total_price, refund_promotion_price, refund_coupon_price, > refund_other_price, extend_info, create_time, modify_time]) {code} > when set table.exec.mini-batch.enabled=false > == Optimized Logical Plan == > {code:java} > Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, > refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, > apply_refund_reason_code, apply_refund_reason_desc, > apply_refund_review_status, apply_refund_review_status_desc, > apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, > refund_apply_originator, second_reason_code, second_reason, > refund_target_account, after_service_id, receipt_status, > group_header_goods_status, apply_operator_mis_name, refund_apply_time, > update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, > refund_scale_type_desc, refund_scale, apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price, load_time, > take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, > second_reason_aggregated, compensation_amount, aftersale_type, > group_header_parallel_status, grid_parallel_status]) > +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME > _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, > poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS > apply_refund_reason_code, first_reason AS apply_refund_reason_desc, > CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH > Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE > (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE > _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = > 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE > _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = > 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, > CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER > SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS > apply_pic_url, remark, null:BIGINT AS refund_apply_originator, > CAST(second_reason_code) AS second_reason_code, second_reason, > CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) > AS after_service_id, CAST(user_receipt_status) AS receipt_status, > CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, > operator AS apply_operator_mis_name, create_time AS refund_apply_time, > modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) > AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS > refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, > CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, > CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS > load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS > platform_rate, 1 AS order_sku_type, null:INTEGER AS > second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS > compensation_amount, 1 AS aftersale_type, CAST((((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE > ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) > CASE 2 CASE ((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE > ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) > CASE 4 CASE ((progress_node = > _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE > 0)) AS group_header_parallel_status, CAST((((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = > _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS > grid_parallel_status]) > +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = > refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, > first_reason_code, first_reason, second_reason_code, second_reason, pic_url, > remark, user_receipt_status, refund_target_account, create_time, modify_time, > progress_node, progress_node_status, operator, parallel, extend_info, name, > value, after_service_id0, apply_status, refund_apply_id, order_id, > base_sku_id, apply_refund_num, view_qty, apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price], > leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) > :- Exchange(distribution=[hash[after_service_id0]]) > : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = > after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, > first_reason_code, first_reason, second_reason_code, second_reason, pic_url, > remark, user_receipt_status, refund_target_account, create_time, modify_time, > progress_node, progress_node_status, operator, parallel, extend_info, name, > value, after_service_id0, apply_status], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > : :- Exchange(distribution=[hash[after_service_id]]) > : : +- Calc(select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info, name, value]) > : : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id > = after_service_id0)], select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info, after_service_id0, name, value], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > : : :- Exchange(distribution=[hash[after_service_id]]) > : : : +- Calc(select=[after_service_id, user_id, poi_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time, progress_node, progress_node_status, operator, > parallel, extend_info]) > : : : +- Join(joinType=[LeftOuterJoin], > where=[(after_service_id = after_service_id0)], select=[after_service_id, > user_id, poi_id, refund_type, first_reason_code, first_reason, > second_reason_code, second_reason, pic_url, remark, user_receipt_status, > refund_target_account, create_time, modify_time, after_service_id0, > progress_node, progress_node_status, operator, parallel, extend_info], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > : : : :- > Exchange(distribution=[hash[after_service_id]]) > : : : : +- Calc(select=[after_service_id, user_id, > poi_id, refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, user_receipt_status, refund_target_account, > create_time, modify_time]) > : : : : +- DropUpdateBefore > : : : : +- > TableSourceScan(table=[[default_catalog, default_database, t_name]], > fields=[id, after_service_id, order_id, user_id, poi_id, city_id, > refund_type, first_reason_code, first_reason, second_reason_code, > second_reason, pic_url, remark, refund_price, refund_red_packet_price, > refund_total_price, refund_promotion_price, refund_coupon_price, > refund_other_price, user_receipt_status, collect_status, > refund_target_account, status, flow_instance_id, create_time, modify_time]) > : : : +- > Exchange(distribution=[hash[after_service_id]]) > : : : +- Calc(select=[after_service_id, > progress_node, progress_node_status, operator, parallel, extend_info]) > : : : +- DropUpdateBefore > : : : +- > TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], > fields=[id, after_service_id, order_id, progress_node, progress_node_status, > operator, parallel, flow_element_id, extend_info, create_time, modify_time]) > : : +- Exchange(distribution=[hash[after_service_id]]) > : : +- Calc(select=[after_service_id, name, value]) > : : +- DropUpdateBefore > : : +- TableSourceScan(table=[[default_catalog, > default_database, t_attr_name]], fields=[id, after_service_id, order_id, > name, value, create_time, modify_time]) > : +- Exchange(distribution=[hash[after_service_id]]) > : +- Calc(select=[after_service_id, apply_status]) > : +- DropUpdateBefore > : +- TableSourceScan(table=[[default_catalog, > default_database, t_apply_status_name]], fields=[id, after_service_id, > order_id, apply_status, ai_audit_status, group_header_confirm_status, > group_header_retrieve_status, driver_retrieve_status, > group_header_parallel_status, grid_parallel_status, create_time, modify_time]) > +- Exchange(distribution=[hash[refund_apply_id]]) > +- Calc(select=[refund_apply_id, order_id, base_sku_id, > apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, > refund_scale_user_real_pay, refund_red_packet_price]) > +- GroupAggregate(groupBy=[refund_apply_id, order_id, > base_sku_id], select=[refund_apply_id, order_id, base_sku_id, > SUM_RETRACT($f3) AS apply_refund_num, SUM_RETRACT($f4) AS view_qty, > SUM_RETRACT($f5) AS refund_scale_user_real_pay, SUM_RETRACT($f6) AS > refund_red_packet_price]) > +- Exchange(distribution=[hash[refund_apply_id, order_id, > base_sku_id]]) > +- Calc(select=[after_service_id AS refund_apply_id, > order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF > refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF > CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS > $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS > $f6]) > +- TableSourceScan(table=[[default_catalog, > default_database, t_item_name]], fields=[id, refund_fwd_item_id, > after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, > supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, > accurate_refund, refund_price, refund_red_packet_price, refund_price_info, > refund_total_price, refund_promotion_price, refund_coupon_price, > refund_other_price, extend_info, create_time, modify_time]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)