你好 主要流程 见附件 流程就是使用cdc 读取mysql。然后left join 维度表 ,最后写入到mysql 问题是测试的时候。update cdc的源表一条数据。发现结果的数据 有时候有 有时候没有, 使用connect=print 发现两条数据流。一个是delete 一个是insert,这边怀疑是乱序 导致先insert 在delete掉了, 把并行度设置为1的时候。就是正常的。如果沟通不方便 欢迎加钉钉13269166963。 在 2020-10-26 12:11:59,"史 正超" <shizhengc...@outlook.com> 写道: >Hi, @air23, 你能提供下完整的sql吗?,我来复现下这个问题 >________________________________ >发件人: air23 <wangfei23_...@163.com> >发送时间: 2020年10月23日 6:21 >收件人: user-zh@flink.apache.org <user-zh@flink.apache.org> >主题: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题 > >你好, >这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题 >在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据, >但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。 >这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。
package com.flink.large.screen.demand; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import zt.common.util.PropertyUtil; import java.util.Properties; public class FlinkGcWmsOrdersCdc2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //å¦æ并è¡åº¦è®¾ç½®1 å°±ä¸ä¼ä¹±åº // streamExecutionEnvironment.setParallelism(1); // streamExecutionEnvironment.set StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment); // å建cdc表 createCdcTable(tableEnvironment); // å建维度表 createDimTable(tableEnvironment); // æ§è¡ä¸å¡é»è¾ executeBusinessSql(tableEnvironment); } private static String confCdcCommonMessage(String message) { String connector = "mysql-cdc"; String hostname = "zongteng75"; String port = "3306"; String username = "root"; String password = "****"; String database_name = "zongteng_streaming"; return String.format(message, connector, hostname, port, username, password, database_name); } /** * å建cdc表 * @param tableEnvironment */ private static void createCdcTable(StreamTableEnvironment tableEnvironment) { // // print_table // String create_table_print_table = "CREATE TABLE print_table (" + // " f0 STRING" + // ") with (" + // " 'connector' = 'print' " + // ")"; // tableEnvironment.executeSql(create_table_print_table); // gc_wms_orders String create_table_gc_wms_orders = confCdcCommonMessage("CREATE TABLE gc_wms_orders (\n" + "order_id INT,\n" + "order_code STRING,\n" + "reference_no STRING,\n" + "customer_code STRING,\n" + "platform STRING,\n" + "order_platform_type STRING,\n" + "create_type STRING,\n" + "warehouse_id INT,\n" + "is_oda INT,\n" + "is_insurance INT,\n" + "sm_code STRING,\n" + "parcel_quantity INT,\n" + "order_status INT,\n" + "problem_status INT,\n" + "underreview_status INT,\n" + "upload_express_status INT,\n" + "anew_express_status INT,\n" + "intercept_status INT,\n" + "sync_status INT,\n" + "add_time TIMESTAMP,\n" + "order_pick_type INT,\n" + "sc_id INT,\n" + "sync_wms_time TIMESTAMP,\n" + "operator_note STRING,\n" + "is_fba INT,\n" + "outbound_time TIMESTAMP,\n" + "is_more_box INT,\n" + "o_timestamp STRING,\n" + "payment_time TIMESTAMP,\n" + "oms_date_create TIMESTAMP,\n" + "pre_delivery_time TIMESTAMP,\n" + "is_flow_volume INT,\n" + "w_insert_dt STRING,\n" + "data_flag STRING," + "PRIMARY KEY (order_id) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_wms_orders'\n" + ")"); // gc_wms_order_box String create_table_gc_wms_order_box = confCdcCommonMessage("CREATE TABLE gc_wms_order_box (\n" + " order_code STRING,\n" + " box_no STRING,\n" + " tracking_number STRING,\n" + " ob_add_time TIMESTAMP,\n" + " w_insert_dt STRING,\n" + " data_flag STRING,\n" + " PRIMARY KEY (order_code, box_no) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_wms_order_box'\n" + ")"); // gc_wms_ship_order String create_table_gc_wms_ship_order = confCdcCommonMessage("CREATE TABLE gc_wms_ship_order (\n" + " order_id INT,\n" + " order_code STRING,\n" + " tracking_number STRING,\n" + " so_add_time TIMESTAMP,\n" + " w_insert_dt STRING,\n" + " data_flag STRING,\n" + " PRIMARY KEY (order_code) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_wms_ship_order'\n" + ")"); // gc_wms_order_operation_time String create_table_gc_wms_order_operation_time = confCdcCommonMessage("CREATE TABLE gc_wms_order_operation_time (\n" + " oot_id INT,\n" + " order_id INT,\n" + " process_time TIMESTAMP,\n" + " pack_time TIMESTAMP,\n" + " ship_time TIMESTAMP,\n" + " import_time TIMESTAMP,\n" + " w_insert_dt STRING,\n" + " data_flag STRING,\n" + " PRIMARY KEY (oot_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_wms_order_operation_time'\n" + ")"); // gc_wms_order_physical_relation String create_table_gc_wms_order_physical_relation = confCdcCommonMessage("CREATE TABLE gc_wms_order_physical_relation (\n" + " opr_id INT,\n" + " wp_code STRING,\n" + " order_code STRING,\n" + " w_insert_dt STRING,\n" + " data_flag STRING,\n" + " PRIMARY KEY (opr_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_wms_order_physical_relation'\n" + ")"); // gc_oms_orders String create_table_gc_oms_orders = confCdcCommonMessage("CREATE TABLE gc_oms_orders (\n" + " order_id INT,\n" + " platform STRING,\n" + " customer_id INT,\n" + " company_code STRING,\n" + " order_type STRING,\n" + " create_type STRING,\n" + " order_status INT,\n" + " sub_status INT,\n" + " cancel_status INT,\n" + " create_method INT,\n" + " shipping_method STRING,\n" + " shipping_method_platform STRING,\n" + " warehouse_id STRING,\n" + " warehouse_code STRING,\n" + " shipping_method_no STRING,\n" + " is_oda INT,\n" + " oda_type INT,\n" + " is_signature INT,\n" + " is_insurance INT,\n" + " insurance_value DECIMAL(10,3),\n" + " order_weight FLOAT,\n" + " order_desc STRING,\n" + " date_create TIMESTAMP,\n" + " date_release TIMESTAMP,\n" + " date_pickup TIMESTAMP,\n" + " date_warehouse_shipping TIMESTAMP,\n" + " date_last_modify TIMESTAMP,\n" + " refrence_no STRING,\n" + " refrence_no_platform STRING,\n" + " refrence_no_sys STRING,\n" + " refrence_no_warehouse STRING,\n" + " shipping_address_id INT,\n" + " operator_id STRING,\n" + " operator_note STRING,\n" + " sync_status STRING,\n" + " sync_time TIMESTAMP,\n" + " date_create_platform TIMESTAMP,\n" + " date_paid_platform TIMESTAMP,\n" + " date_paid_int INT,\n" + " amountpaid FLOAT,\n" + " subtotal FLOAT,\n" + " ship_fee FLOAT,\n" + " platform_fee FLOAT,\n" + " finalvaluefee FLOAT,\n" + " delivery_fee DECIMAL(12,3),\n" + " currency STRING,\n" + " user_account STRING,\n" + " buyer_id STRING,\n" + " third_part_ship INT,\n" + " is_merge INT,\n" + " site STRING,\n" + " abnormal_type INT,\n" + " abnormal_reason STRING,\n" + " is_one_piece INT,\n" + " product_count INT,\n" + " consignee_country STRING,\n" + " buyer_name STRING,\n" + " buyer_mail STRING,\n" + " has_buyer_note INT,\n" + " fulfillment_channel STRING,\n" + " ship_service_level STRING,\n" + " shipment_service_level_category STRING,\n" + " leave_comment STRING,\n" + " ebay_case_type STRING,\n" + " order_refund STRING,\n" + " process_again STRING,\n" + " has_export INT,\n" + " has_pickup INT,\n" + " has_print_pickup_label INT,\n" + " service_status INT,\n" + " service_provider STRING,\n" + " ot_id INT,\n" + " sys_tips STRING,\n" + " consignee_name STRING,\n" + " consignee_company STRING,\n" + " consignee_street1 STRING,\n" + " consignee_street2 STRING,\n" + " consignee_street3 STRING,\n" + " consignee_district STRING,\n" + " consignee_county STRING,\n" + " consignee_city STRING,\n" + " consignee_state STRING,\n" + " consignee_country_code STRING,\n" + " consignee_country_name STRING,\n" + " consignee_phone STRING,\n" + " consignee_email STRING,\n" + " consignee_postal_code STRING,\n" + " consignee_doorplate STRING,\n" + " shared_sign INT,\n" + " is_returns INT,\n" + " is_shipping_method_not_allow_update INT,\n" + " is_fba INT,\n" + " consignee_is_residential INT,\n" + " is_more_box INT,\n" + " is_attachment INT,\n" + " so_length DECIMAL(10,2),\n" + " o_timestamp STRING,\n" + " so_width DECIMAL(10,2),\n" + " so_height DECIMAL(10,2),\n" + " age_detection INT,\n" + " payment_time TIMESTAMP,\n" + " is_recommend INT,\n" + " is_truck_service INT,\n" + " new_order_type INT,\n" + " design_batch_status INT,\n" + " auto_verify_result INT,\n" + " auto_verify_reason STRING,\n" + " is_sync_tracking_number INT,\n" + " order_sn STRING,\n" + " datasource_num_id STRING,\n" + " data_flag STRING,\n" + " w_insert_dt STRING,\n" + " PRIMARY KEY (order_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'hostname' = '%s',\n" + " 'port' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'database-name' = '%s',\n" + " 'table-name' = 'gc_oms_orders'\n" + ")"); // cdc tableEnvironment.executeSql(create_table_gc_wms_orders); tableEnvironment.executeSql(create_table_gc_wms_order_box); tableEnvironment.executeSql(create_table_gc_wms_ship_order); tableEnvironment.executeSql(create_table_gc_wms_order_operation_time); tableEnvironment.executeSql(create_table_gc_wms_order_physical_relation); tableEnvironment.executeSql(create_table_gc_oms_orders); } /** * æ§è¡ä¸å¡sql */ private static void executeBusinessSql(StreamTableEnvironment tEnv) { String create_table_gc_wms_qudaorealfenxi_view = "CREATE TABLE gc_wms_qudaorealfenxi_view (\n" + " uuid STRING,\n" + " datasource_num_id INT,\n" + " customer_code STRING,\n" + " order_code STRING,\n" + " add_time TIMESTAMP,\n" + " order_date_release TIMESTAMP,\n" + " import_time TIMESTAMP,\n" + " process_time TIMESTAMP,\n" + " pack_time TIMESTAMP,\n" + " ship_time TIMESTAMP,\n" + " tracking_number STRING,\n" + " warehouse_id INT,\n" + " warehouse_code STRING,\n" + " warehouse_desc STRING,\n" + " timezone INT,\n" + " sm_code STRING,\n" + " sm_name STRING,\n" + " sc_id INT,\n" + " server_channel_name STRING,\n" + " server_code STRING,\n" + " server_name STRING,\n" + " tms_order_code STRING,\n" + " wp_code STRING,\n" + " wp_name STRING,\n" + " is_fba INT,\n" + " sm_is_tracking_val STRING,\n" + " PRIMARY KEY (uuid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://zongteng75:3306/zongteng_streaming?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8'," + " 'table-name' = 'gc_wms_qudaorealfenxi_view',\n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = 'c4RKchNdOg#',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' ='5'" + ")"; String create_table_gc_wms_qudaorealfenxi_view_print = "CREATE TABLE gc_wms_qudaorealfenxi_view1 (\n" + " uuid STRING,\n" + " datasource_num_id INT,\n" + " customer_code STRING,\n" + " order_code STRING,\n" + " add_time TIMESTAMP,\n" + " order_date_release TIMESTAMP,\n" + " import_time TIMESTAMP,\n" + " process_time TIMESTAMP,\n" + " pack_time TIMESTAMP,\n" + " ship_time TIMESTAMP,\n" + " tracking_number STRING,\n" + " warehouse_id INT,\n" + " warehouse_code STRING,\n" + " warehouse_desc STRING,\n" + " timezone INT,\n" + " sm_code STRING,\n" + " sm_name STRING,\n" + " sc_id INT,\n" + " server_channel_name STRING,\n" + " server_code STRING,\n" + " server_name STRING,\n" + " tms_order_code STRING,\n" + " wp_code STRING,\n" + " wp_name STRING,\n" + " is_fba INT,\n" + " sm_is_tracking_val STRING,\n" + " PRIMARY KEY (uuid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; // gc_wms_orders | order_id // gc_wms_order_box | order_code box_no // gc_wms_ship_order | order_code // gc_wms_order_operation_time | oot_id // gc_wms_order_physical_relation | opr_id // gc_oms_orders | order_id String sql = "INSERT INTO gc_wms_qudaorealfenxi_view" + " SELECT " + " CONCAT(" + " CAST(gwo.order_id AS STRING), '-'\n" + " ,IF (gwob.order_code IS NULL, 'gwob_code_1', gwob.order_code), '-'\n" + " ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no), '-'\n" + " ,IF (gwso.order_code IS NULL, 'gwso_code_1', gwso.order_code), '-'\n" + " ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS STRING)), '-'\n" + " ,IF (op.order_code IS NULL, 'op_code_1', CAST ( op.order_code AS STRING)), '-'\n" + " ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id AS STRING))\n" + " )\n" + " ,9004 AS datasource_num_id\n" + " ,gwo.customer_code AS customer_code\n" + " ,gwo.order_code AS order_code\n" + " ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" + " ,oo.date_release AS order_date_release" + " ,opt.import_time AS import_time" + " ,opt.process_time AS process_time" + " ,opt.pack_time AS pack_time" + " ,opt.ship_time AS ship_time" + " ,IF (gwob.tracking_number IS NOT NULL, gwob.tracking_number, gwso.tracking_number) AS tracking_number" + " ,gwo.warehouse_id AS warehouse_id" + " ,ware.warehouse_code AS warehouse_code" + " ,ware.warehouse_desc AS warehouse_desc" + " ,(case\n" + " when (tz.timezone_season_type = 'summer_time') then\n" + " (case\n" + " when (gwo.add_time between tz.timezone_season_start and tz.timezone_season_end) then tz.timezone_summer_number else tz.timezone_winner_number\n" + " end)\n" + " when (tz.timezone_season_type = 'winner_time') then\n" + " (case\n" + " when (gwo.add_time between tz.timezone_season_start and tz.timezone_season_end) then tz.timezone_winner_number else tz.timezone_summer_number\n" + " end)\n" + " end) AS timezone\n" + " ,gwo.sm_code AS sm_code\n" + " ,sm.sm_name AS sm_name\n" + " ,gwo.sc_id AS sc_id\n" + " ,sc.server_channel_name AS server_channel_name\n" + " ,s.server_code AS server_code\n" + " ,s.server_name AS server_name\n" + " ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '', concat('-', gwob.box_no))) AS tms_order_code\n" + " ,op.wp_code AS wp_code\n" + " ,wp.wp_name AS wp_name\n" + " ,CAST (gwo.is_fba AS INT) AS is_fba\n" + " ,sm.sm_is_tracking_val AS sm_is_tracking_val" + " FROM " + "gc_wms_orders" + " AS gwo\n" + " LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code = gwob.order_code\n" + " LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code = gwso.order_code\n" + " LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id AS INT) = ware.warehouse_id\n" + " LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT) = tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) = tz.timezone_year\n" + " LEFT JOIN (SELECT * FROM dim_shipping_method WHERE datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" + " LEFT JOIN (SELECT * FROM dim_server_channel WHERE datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) = sc.server_channel_id\n" + " LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id = 9004) AS s ON sc.server_channel_server_id = s.server_id\n" + " LEFT JOIN gc_wms_order_operation_time opt ON CAST (gwo.order_id AS INT) = opt.order_id\n" + " LEFT JOIN ( SELECT " + " gwo.order_code \n" + " ,min(pr.wp_code) wp_code \n" + " FROM " + "gc_wms_orders" + " AS gwo \n" + " LEFT JOIN gc_wms_order_physical_relation AS pr ON gwo.order_code = pr.order_code \n" + " WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10 \n" + " GROUP BY gwo.order_code" + " ) AS op \n" + " ON gwo.order_code = op.order_code\n" + " LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code = op.wp_code\n" + " LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform = gwo.order_code\n" + " WHERE 1 = 1\n" + " AND gwo.order_status <> 0\n" + " AND gwo.order_platform_type = 'sale'\n" + " AND gwo.sm_code <> 'ZITI'\n" + " AND gwo.warehouse_id not in (5,6)\n" + " AND gwo.customer_code not in ('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n" + " AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE) <= 10 \n"; String print_sql = "INSERT INTO gc_wms_qudaorealfenxi_view1" + " SELECT " + " CONCAT(" + " CAST(gwo.order_id AS STRING), '-'\n" + " ,IF (gwob.order_code IS NULL, 'gwob_code_1', gwob.order_code), '-'\n" + " ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no), '-'\n" + " ,IF (gwso.order_code IS NULL, 'gwso_code_1', gwso.order_code), '-'\n" + " ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS STRING)), '-'\n" + " ,IF (op.order_code IS NULL, 'op_code_1', CAST ( op.order_code AS STRING)), '-'\n" + " ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id AS STRING))\n" + " )\n" + " ,9004 AS datasource_num_id\n" + " ,gwo.customer_code AS customer_code\n" + " ,gwo.order_code AS order_code\n" + " ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" + " ,oo.date_release AS order_date_release" + " ,opt.import_time AS import_time" + " ,opt.process_time AS process_time" + " ,opt.pack_time AS pack_time" + " ,opt.ship_time AS ship_time" + " ,IF (gwob.tracking_number IS NOT NULL, gwob.tracking_number, gwso.tracking_number) AS tracking_number" + " ,gwo.warehouse_id AS warehouse_id" + " ,ware.warehouse_code AS warehouse_code" + " ,ware.warehouse_desc AS warehouse_desc" + " ,(case\n" + " when (tz.timezone_season_type = 'summer_time') then\n" + " (case\n" + " when (gwo.add_time between tz.timezone_season_start and tz.timezone_season_end) then tz.timezone_summer_number else tz.timezone_winner_number\n" + " end)\n" + " when (tz.timezone_season_type = 'winner_time') then\n" + " (case\n" + " when (gwo.add_time between tz.timezone_season_start and tz.timezone_season_end) then tz.timezone_winner_number else tz.timezone_summer_number\n" + " end)\n" + " end) AS timezone\n" + " ,gwo.sm_code AS sm_code\n" + " ,sm.sm_name AS sm_name\n" + " ,gwo.sc_id AS sc_id\n" + " ,sc.server_channel_name AS server_channel_name\n" + " ,s.server_code AS server_code\n" + " ,s.server_name AS server_name\n" + " ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '', concat('-', gwob.box_no))) AS tms_order_code\n" + " ,op.wp_code AS wp_code\n" + " ,wp.wp_name AS wp_name\n" + " ,CAST (gwo.is_fba AS INT) AS is_fba\n" + " ,sm.sm_is_tracking_val AS sm_is_tracking_val" + " FROM " + "gc_wms_orders" + " AS gwo\n" + " LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code = gwob.order_code\n" + " LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code = gwso.order_code\n" + " LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id AS INT) = ware.warehouse_id\n" + " LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT) = tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) = tz.timezone_year\n" + " LEFT JOIN (SELECT * FROM dim_shipping_method WHERE datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" + " LEFT JOIN (SELECT * FROM dim_server_channel WHERE datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) = sc.server_channel_id\n" + " LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id = 9004) AS s ON sc.server_channel_server_id = s.server_id\n" + " LEFT JOIN gc_wms_order_operation_time opt ON CAST (gwo.order_id AS INT) = opt.order_id\n" + " LEFT JOIN ( SELECT " + " gwo.order_code \n" + " ,min(pr.wp_code) wp_code \n" + " FROM " + "gc_wms_orders" + " AS gwo \n" + " LEFT JOIN gc_wms_order_physical_relation AS pr ON gwo.order_code = pr.order_code \n" + " WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10 \n" + " GROUP BY gwo.order_code" + " ) AS op \n" + " ON gwo.order_code = op.order_code\n" + " LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code = op.wp_code\n" + " LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform = gwo.order_code\n" + " WHERE 1 = 1\n" + " AND gwo.order_status <> 0\n" + " AND gwo.order_platform_type = 'sale'\n" + " AND gwo.sm_code <> 'ZITI'\n" + " AND gwo.warehouse_id not in (5,6)\n" + " AND gwo.customer_code not in ('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n" + " AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE) <= 10 \n"; tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view); tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view_print); // è°è¯ StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); stmtSet.addInsertSql(print_sql); // è°è¯ // æ§è¡ååæ·»å çææ INSERT è¯å¥ stmtSet.execute(); } /** * å建纬度表 */ private static void createDimTable(StreamTableEnvironment tEnv) { // gc_wms_warehouse String create_table_gc_wms_warehouse = confJdbc("CREATE TABLE gc_wms_warehouse (\n" + " warehouse_id INT,\n" + " warehouse_code STRING,\n" + " warehouse_type INT,\n" + " warehouse_status INT,\n" + " warehouse_virtual INT,\n" + " warehouse_transfer INT,\n" + " country_id INT,\n" + " country_code STRING,\n" + " state STRING,\n" + " city STRING,\n" + " contacter STRING,\n" + " company STRING,\n" + " phone_no STRING,\n" + " street_address1 STRING,\n" + " street_address2 STRING,\n" + " postcode STRING,\n" + " warehouse_desc STRING,\n" + " warehouse_add_time TIMESTAMP,\n" + " warehouse_update_time TIMESTAMP,\n" + " warehouse_proxy_code STRING,\n" + " street_number STRING,\n" + " timezone INT,\n" + " value_added_tax INT,\n" + " warehouse_en STRING,\n" + " saving_time INT,\n" + " arrival_days INT,\n" + " currency_code STRING,\n" + " `day` STRING,\n" + " PRIMARY KEY (warehouse_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'gc_wms_warehouse',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); String create_table_dp_timezone = confJdbc("CREATE TABLE dp_timezone (\n" + " warehouse_id INT,\n" + " warehouse_code STRING,\n" + " warehouse_name STRING,\n" + " timezone_year INT,\n" + " timezone_season_type STRING,\n" + " timezone_season_start TIMESTAMP,\n" + " timezone_season_end TIMESTAMP,\n" + " timezone_summer_number INT,\n" + " timezone_winner_number INT,\n" + " timezone_summer_time_dif_val INT,\n" + " timezone_winner_time_dif_val INT\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'dp_timezone',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); String create_table_dim_shipping_method = confJdbc("CREATE TABLE dim_shipping_method (\n" + " row_wid STRING,\n" + " warehouse_key STRING,\n" + " sc_key STRING,\n" + " st_key STRING,\n" + " sct_key STRING,\n" + " sm_transfer_warehouse_key STRING,\n" + " sm_id INT,\n" + " sm_code STRING,\n" + " sm_short_name STRING,\n" + " sm_name_cn STRING,\n" + " sm_name STRING,\n" + " sm_pg_code STRING,\n" + " sm_mp_fee DECIMAL(10,2),\n" + " sm_reg_fee DECIMAL(10,2),\n" + " sm_addons DECIMAL(10,2),\n" + " sm_type INT,\n" + " sm_type_val STRING,\n" + " sm_baf DECIMAL(10,2),\n" + " sm_discount DECIMAL(10,2),\n" + " sm_delivery_time_min STRING,\n" + " sm_delivery_time_max STRING,\n" + " sm_delivery_time_avg STRING,\n" + " sm_is_volume INT,\n" + " sm_is_volume_val STRING,\n" + " sm_is_charge INT,\n" + " sm_is_charge_val STRING,\n" + " sm_vol_rate DECIMAL(15,4),\n" + " sm_status INT,\n" + " sm_status_val STRING,\n" + " sm_class_code STRING,\n" + " sm_class_code_val STRING,\n" + " sm_logo STRING,\n" + " sm_return_recipient STRING,\n" + " sm_return_address STRING,\n" + " sm_discount_min DECIMAL(10,2),\n" + " sm_mp_fee_min DECIMAL(10,2),\n" + " sm_reg_fee_min DECIMAL(10,2),\n" + " sm_limit_volume INT,\n" + " sm_limit_weight DECIMAL(10,3),\n" + " sm_sort INT,\n" + " sm_is_tracking INT,\n" + " sm_is_tracking_val STRING,\n" + " sm_is_signature INT,\n" + " sm_is_signature_val STRING,\n" + " sm_is_insurance INT,\n" + " sm_is_insurance_val STRING,\n" + " sm_is_validate_remote INT,\n" + " sm_is_validate_remote_val STRING,\n" + " sm_warehouse_id INT,\n" + " sm_calc_type INT,\n" + " sm_fee_type INT,\n" + " sm_sc_id INT,\n" + " sm_st_id INT,\n" + " sm_carrier_number STRING,\n" + " sm_allow_letter INT,\n" + " sm_allow_letter_val STRING,\n" + " sm_sct_id INT,\n" + " sm_update_time TIMESTAMP,\n" + " sm_pre_generate_label INT,\n" + " sm_pre_generate_label_val STRING,\n" + " sm_code_type INT,\n" + " sm_code_type_val STRING,\n" + " sm_is_more_box INT,\n" + " sm_is_more_box_val STRING,\n" + " sm_return_type INT,\n" + " sm_attribute STRING,\n" + " sm_attribute_val STRING,\n" + " sm_is_claim INT,\n" + " sm_is_claim_val STRING,\n" + " sm_transfer_warehouse_id INT,\n" + " sm_age_detection INT,\n" + " sm_age_detection_val STRING,\n" + " sm_recommend_platform STRING,\n" + " sm_volume_unit STRING,\n" + " sm_volume_type INT,\n" + " sm_volume_type_val STRING,\n" + " sm_volume_value DECIMAL(10,2),\n" + " sm_weight_unit STRING,\n" + " sm_weight_type INT,\n" + " sm_weight_type_val STRING,\n" + " sm_weight_value DECIMAL(10,2),\n" + " sm_claim_desc STRING,\n" + " sm_sfp_label_type INT,\n" + " sm_sfp_label_type_val STRING,\n" + " sm_life_gate INT,\n" + " sm_life_gate_val STRING,\n" + " sm_truck INT,\n" + " sm_truck_val STRING,\n" + " datasource_num_id STRING,\n" + " data_flag STRING,\n" + " w_insert_dt TIMESTAMP,\n" + " PRIMARY KEY (row_wid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'dim_shipping_method',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); // dim_server_channel String create_table_dim_server_channel = confJdbc("CREATE TABLE dim_server_channel (\n" + " row_wid STRING,\n" + " server_key STRING,\n" + " server_channel_id INT,\n" + " server_channel_server_id INT,\n" + " server_channel_code STRING,\n" + " server_channel_short_name STRING,\n" + " server_channel_name STRING,\n" + " server_channel_name_en STRING,\n" + " server_channel_note STRING,\n" + " server_channel_status INT,\n" + " server_channel_status_val STRING,\n" + " server_channel_is_weighing INT,\n" + " server_channel_is_weighing_val STRING,\n" + " server_channel_is_tracking INT,\n" + " server_channel_is_tracking_val STRING,\n" + " server_channel_is_volume INT,\n" + " server_channel_is_volume_val STRING,\n" + " server_channel_st_id INT,\n" + " server_channel_st_auto_delivery INT,\n" + " server_channel_st_split_start INT,\n" + " server_channel_st_split_value INT,\n" + " server_channel_st_split_type STRING,\n" + " server_channel_add_time TIMESTAMP,\n" + " server_channel_update_time TIMESTAMP,\n" + " server_channel_asp_id INT,\n" + " server_channel_asp_code STRING,\n" + " server_channel_apm_id INT,\n" + " server_channel_api_pk_type STRING,\n" + " server_channel_wut_code STRING,\n" + " server_channel_min_weight DECIMAL(11,3),\n" + " server_channel_max_weight DECIMAL(11,3),\n" + " server_channel_is_validate_address INT,\n" + " server_channel_is_validate_address_val STRING,\n" + " server_channel_is_pack_weighing INT,\n" + " server_channel_is_pack_weighing_val STRING,\n" + " server_channel_sign_type INT,\n" + " server_channel_right_trim_length INT,\n" + " server_channel_left_trim_length INT,\n" + " server_channel_is_mark_weight INT,\n" + " server_channel_is_mark_weight_val STRING,\n" + " server_channel_push_tms_status INT,\n" + " datasource_num_id STRING,\n" + " data_flag STRING,\n" + " w_insert_dt TIMESTAMP,\n" + " PRIMARY KEY (row_wid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'dim_server_channel',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); // dim_server String create_table_dim_server = confJdbc("CREATE TABLE dim_server (\n" + " row_wid STRING,\n" + " server_id INT,\n" + " server_code STRING,\n" + " server_name STRING,\n" + " server_contact_name STRING,\n" + " server_contact_phone STRING,\n" + " server_address STRING,\n" + " server_settlement_type INT,\n" + " server_settlement_type_val STRING,\n" + " server_balance DECIMAL(10,3),\n" + " server_currency_code STRING,\n" + " server_tax DECIMAL(10,3),\n" + " server_person_user INT,\n" + " server_update_date TIMESTAMP,\n" + " server_type INT,\n" + " server_is_ignore_express INT,\n" + " server_as_id INT,\n" + " server_as_name STRING,\n" + " server_add_date TIMESTAMP,\n" + " server_status INT,\n" + " server_status_val STRING,\n" + " server_ap_id INT,\n" + " server_carrier_name STRING,\n" + " server_cancel_label_flag INT,\n" + " server_is_cancel_label_flag_val STRING,\n" + " datasource_num_id STRING,\n" + " data_flag STRING,\n" + " w_insert_dt TIMESTAMP,\n" + " PRIMARY KEY (row_wid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'dim_server',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); // gc_wms_warehouse_physical String create_table_gc_wms_warehouse_physical = confJdbc("CREATE TABLE gc_wms_warehouse_physical (\n" + " wp_id BIGINT,\n" + " warehouse_id INT,\n" + " wp_name STRING,\n" + " wp_code STRING,\n" + " wp_status INT,\n" + " wp_priority INT,\n" + " wp_state STRING,\n" + " wp_city STRING,\n" + " wp_postcode STRING,\n" + " wp_company STRING,\n" + " wp_contacter STRING,\n" + " wp_phone STRING,\n" + " wp_street_address1 STRING,\n" + " wp_street_address2 STRING,\n" + " wp_street_number STRING,\n" + " wp_add_time TIMESTAMP,\n" + " wp_update_time TIMESTAMP,\n" + " `day` STRING,\n" + " PRIMARY KEY (wp_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = '%s',\n" + " 'url' = '%s',\n" + " 'table-name' = 'gc_wms_warehouse_physical',\n" + " 'driver' = '%s',\n" + " 'username' = '%s',\n" + " 'password' = '%s',\n" + " 'sink.buffer-flush.interval' = '1s',\n" + " 'sink.buffer-flush.max-rows' = '5'\n" + ")"); tEnv.executeSql(create_table_gc_wms_warehouse); tEnv.executeSql(create_table_dp_timezone); tEnv.executeSql(create_table_dim_shipping_method); tEnv.executeSql(create_table_dim_server_channel); tEnv.executeSql(create_table_dim_server); tEnv.executeSql(create_table_gc_wms_warehouse_physical); } /** * é ç½®jdbcå±æ§ * @param tableSql */ private static String confJdbc(String tableSql) { Properties properties = PropertyUtil.loadProperties("druid_jdbc/zongteng_streaming.properties"); String connector = "jdbc"; String url = properties.getProperty("url"); String driver = properties.getProperty("driverClassName"); String username = properties.getProperty("username"); String password = properties.getProperty("password"); return String.format(tableSql, connector, url, driver, username, password); } }