你好 主要流程 见附件


流程就是使用cdc 读取mysql。然后left join 维度表 ,最后写入到mysql


问题是测试的时候。update cdc的源表一条数据。发现结果的数据 有时候有 有时候没有,
使用connect=print 发现两条数据流。一个是delete 一个是insert,这边怀疑是乱序 导致先insert 在delete掉了,
把并行度设置为1的时候。就是正常的。如果沟通不方便 欢迎加钉钉13269166963。











在 2020-10-26 12:11:59,"史 正超" <shizhengc...@outlook.com> 写道:
>Hi, @air23, 你能提供下完整的sql吗?,我来复现下这个问题
>________________________________
>发件人: air23 <wangfei23_...@163.com>
>发送时间: 2020年10月23日 6:21
>收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
>主题: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题
>
>你好,
>这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
>在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
>但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。
>这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。
package com.flink.large.screen.demand;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import zt.common.util.PropertyUtil;

import java.util.Properties;


public class FlinkGcWmsOrdersCdc2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();

        //如果并行度设置1 就不会乱序
        // streamExecutionEnvironment.setParallelism(1);
//        streamExecutionEnvironment.set

        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);

        

        // 创建cdc表
        createCdcTable(tableEnvironment);

        // 创建维度表
        createDimTable(tableEnvironment);

        // 执行业务逻辑
        executeBusinessSql(tableEnvironment);
    }

    private static String confCdcCommonMessage(String message) {
        String connector = "mysql-cdc";
        String hostname = "zongteng75";
        String port = "3306";
        String username = "root";
        String password = "****";
        String database_name = "zongteng_streaming";

        return String.format(message, connector, hostname, port, username, 
password, database_name);
    }

    /**
     * 创建cdc表
     * @param tableEnvironment
     */
    private static void createCdcTable(StreamTableEnvironment tableEnvironment) 
{

//        // print_table
//        String create_table_print_table = "CREATE TABLE print_table (" +
//                "  f0 STRING" +
//                ") with (" +
//                " 'connector' = 'print' " +
//                ")";
//        tableEnvironment.executeSql(create_table_print_table);

        // gc_wms_orders
        String create_table_gc_wms_orders = confCdcCommonMessage("CREATE TABLE 
gc_wms_orders (\n" +
                    "order_id INT,\n" +
                    "order_code STRING,\n" +
                    "reference_no STRING,\n" +
                    "customer_code STRING,\n" +
                    "platform STRING,\n" +
                    "order_platform_type STRING,\n" +
                    "create_type STRING,\n" +
                    "warehouse_id INT,\n" +
                    "is_oda INT,\n" +
                    "is_insurance INT,\n" +
                    "sm_code STRING,\n" +
                    "parcel_quantity INT,\n" +
                    "order_status INT,\n" +
                    "problem_status INT,\n" +
                    "underreview_status INT,\n" +
                    "upload_express_status INT,\n" +
                    "anew_express_status INT,\n" +
                    "intercept_status INT,\n" +
                    "sync_status INT,\n" +
                    "add_time TIMESTAMP,\n" +
                    "order_pick_type INT,\n" +
                    "sc_id INT,\n" +
                    "sync_wms_time TIMESTAMP,\n" +
                    "operator_note STRING,\n" +
                    "is_fba INT,\n" +
                    "outbound_time TIMESTAMP,\n" +
                    "is_more_box INT,\n" +
                    "o_timestamp STRING,\n" +
                    "payment_time TIMESTAMP,\n" +
                    "oms_date_create TIMESTAMP,\n" +
                    "pre_delivery_time TIMESTAMP,\n" +
                    "is_flow_volume INT,\n" +
                    "w_insert_dt STRING,\n" +
                    "data_flag STRING," +
                    "PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "  'connector' = '%s',\n" +
                "  'hostname' = '%s',\n" +
                "  'port' = '%s',\n" +
                "  'username' = '%s',\n" +
                "  'password' = '%s',\n" +
                "  'database-name' = '%s',\n" +
                "  'table-name' = 'gc_wms_orders'\n" +
                ")");

        // gc_wms_order_box
        String create_table_gc_wms_order_box = confCdcCommonMessage("CREATE 
TABLE gc_wms_order_box (\n" +
                "  order_code STRING,\n" +
                "  box_no STRING,\n" +
                "  tracking_number STRING,\n" +
                "  ob_add_time TIMESTAMP,\n" +
                "  w_insert_dt STRING,\n" +
                "  data_flag STRING,\n" +
                "  PRIMARY KEY (order_code, box_no) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = '%s',\n" +
                "  'hostname' = '%s',\n" +
                "  'port' = '%s',\n" +
                "  'username' = '%s',\n" +
                "  'password' = '%s',\n" +
                "  'database-name' = '%s',\n" +
                "  'table-name' = 'gc_wms_order_box'\n" +
                ")");

        // gc_wms_ship_order
        String create_table_gc_wms_ship_order = confCdcCommonMessage("CREATE 
TABLE gc_wms_ship_order (\n" +
                "  order_id INT,\n" +
                "  order_code STRING,\n" +
                "  tracking_number STRING,\n" +
                "  so_add_time TIMESTAMP,\n" +
                "  w_insert_dt STRING,\n" +
                "  data_flag STRING,\n" +
                "  PRIMARY KEY (order_code) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'hostname' = '%s',\n" +
                " 'port' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'database-name' = '%s',\n" +
                " 'table-name' = 'gc_wms_ship_order'\n" +
                ")");

        // gc_wms_order_operation_time
        String create_table_gc_wms_order_operation_time = 
confCdcCommonMessage("CREATE TABLE gc_wms_order_operation_time (\n" +
                "  oot_id INT,\n" +
                "  order_id INT,\n" +
                "  process_time TIMESTAMP,\n" +
                "  pack_time TIMESTAMP,\n" +
                "  ship_time TIMESTAMP,\n" +
                "  import_time TIMESTAMP,\n" +
                "  w_insert_dt STRING,\n" +
                "  data_flag STRING,\n" +
                "  PRIMARY KEY (oot_id) NOT ENFORCED\n" +
                ")  WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'hostname' = '%s',\n" +
                " 'port' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'database-name' = '%s',\n" +
                " 'table-name' = 'gc_wms_order_operation_time'\n" +
                ")");

        // gc_wms_order_physical_relation
        String create_table_gc_wms_order_physical_relation = 
confCdcCommonMessage("CREATE TABLE gc_wms_order_physical_relation (\n" +
                "  opr_id INT,\n" +
                "  wp_code STRING,\n" +
                "  order_code STRING,\n" +
                "  w_insert_dt STRING,\n" +
                "  data_flag STRING,\n" +
                "  PRIMARY KEY (opr_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'hostname' = '%s',\n" +
                " 'port' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'database-name' = '%s',\n" +
                " 'table-name' = 'gc_wms_order_physical_relation'\n" +
                ")");

        // gc_oms_orders
        String create_table_gc_oms_orders = confCdcCommonMessage("CREATE TABLE 
gc_oms_orders (\n" +
                "  order_id INT,\n" +
                "  platform STRING,\n" +
                "  customer_id INT,\n" +
                "  company_code STRING,\n" +
                "  order_type STRING,\n" +
                "  create_type STRING,\n" +
                "  order_status INT,\n" +
                "  sub_status INT,\n" +
                "  cancel_status INT,\n" +
                "  create_method INT,\n" +
                "  shipping_method STRING,\n" +
                "  shipping_method_platform STRING,\n" +
                "  warehouse_id STRING,\n" +
                "  warehouse_code STRING,\n" +
                "  shipping_method_no STRING,\n" +
                "  is_oda INT,\n" +
                "  oda_type INT,\n" +
                "  is_signature INT,\n" +
                "  is_insurance INT,\n" +
                "  insurance_value DECIMAL(10,3),\n" +
                "  order_weight FLOAT,\n" +
                "  order_desc STRING,\n" +
                "  date_create TIMESTAMP,\n" +
                "  date_release TIMESTAMP,\n" +
                "  date_pickup TIMESTAMP,\n" +
                "  date_warehouse_shipping TIMESTAMP,\n" +
                "  date_last_modify TIMESTAMP,\n" +
                "  refrence_no STRING,\n" +
                "  refrence_no_platform STRING,\n" +
                "  refrence_no_sys STRING,\n" +
                "  refrence_no_warehouse STRING,\n" +
                "  shipping_address_id INT,\n" +
                "  operator_id STRING,\n" +
                "  operator_note STRING,\n" +
                "  sync_status STRING,\n" +
                "  sync_time TIMESTAMP,\n" +
                "  date_create_platform TIMESTAMP,\n" +
                "  date_paid_platform TIMESTAMP,\n" +
                "  date_paid_int INT,\n" +
                "  amountpaid FLOAT,\n" +
                "  subtotal FLOAT,\n" +
                "  ship_fee FLOAT,\n" +
                "  platform_fee FLOAT,\n" +
                "  finalvaluefee FLOAT,\n" +
                "  delivery_fee DECIMAL(12,3),\n" +
                "  currency STRING,\n" +
                "  user_account STRING,\n" +
                "  buyer_id STRING,\n" +
                "  third_part_ship INT,\n" +
                "  is_merge INT,\n" +
                "  site STRING,\n" +
                "  abnormal_type INT,\n" +
                "  abnormal_reason STRING,\n" +
                "  is_one_piece INT,\n" +
                "  product_count INT,\n" +
                "  consignee_country STRING,\n" +
                "  buyer_name STRING,\n" +
                "  buyer_mail STRING,\n" +
                "  has_buyer_note INT,\n" +
                "  fulfillment_channel STRING,\n" +
                "  ship_service_level STRING,\n" +
                "  shipment_service_level_category STRING,\n" +
                "  leave_comment STRING,\n" +
                "  ebay_case_type STRING,\n" +
                "  order_refund STRING,\n" +
                "  process_again STRING,\n" +
                "  has_export INT,\n" +
                "  has_pickup INT,\n" +
                "  has_print_pickup_label INT,\n" +
                "  service_status INT,\n" +
                "  service_provider STRING,\n" +
                "  ot_id INT,\n" +
                "  sys_tips STRING,\n" +
                "  consignee_name STRING,\n" +
                "  consignee_company STRING,\n" +
                "  consignee_street1 STRING,\n" +
                "  consignee_street2 STRING,\n" +
                "  consignee_street3 STRING,\n" +
                "  consignee_district STRING,\n" +
                "  consignee_county STRING,\n" +
                "  consignee_city STRING,\n" +
                "  consignee_state STRING,\n" +
                "  consignee_country_code STRING,\n" +
                "  consignee_country_name STRING,\n" +
                "  consignee_phone STRING,\n" +
                "  consignee_email STRING,\n" +
                "  consignee_postal_code STRING,\n" +
                "  consignee_doorplate STRING,\n" +
                "  shared_sign INT,\n" +
                "  is_returns INT,\n" +
                "  is_shipping_method_not_allow_update INT,\n" +
                "  is_fba INT,\n" +
                "  consignee_is_residential INT,\n" +
                "  is_more_box INT,\n" +
                "  is_attachment INT,\n" +
                "  so_length DECIMAL(10,2),\n" +
                "  o_timestamp STRING,\n" +
                "  so_width DECIMAL(10,2),\n" +
                "  so_height DECIMAL(10,2),\n" +
                "  age_detection INT,\n" +
                "  payment_time TIMESTAMP,\n" +
                "  is_recommend INT,\n" +
                "  is_truck_service INT,\n" +
                "  new_order_type INT,\n" +
                "  design_batch_status INT,\n" +
                "  auto_verify_result INT,\n" +
                "  auto_verify_reason STRING,\n" +
                "  is_sync_tracking_number INT,\n" +
                "  order_sn STRING,\n" +
                "  datasource_num_id STRING,\n" +
                "  data_flag STRING,\n" +
                "  w_insert_dt STRING,\n" +
                "  PRIMARY KEY (order_id) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'hostname' = '%s',\n" +
                " 'port' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'database-name' = '%s',\n" +
                " 'table-name' = 'gc_oms_orders'\n" +
                ")");

        // cdc
        tableEnvironment.executeSql(create_table_gc_wms_orders);
        tableEnvironment.executeSql(create_table_gc_wms_order_box);
        tableEnvironment.executeSql(create_table_gc_wms_ship_order);
        tableEnvironment.executeSql(create_table_gc_wms_order_operation_time);
        
tableEnvironment.executeSql(create_table_gc_wms_order_physical_relation);
        tableEnvironment.executeSql(create_table_gc_oms_orders);
    }

    /**
     * 执行业务sql
     */
    private static void executeBusinessSql(StreamTableEnvironment tEnv) {

        String create_table_gc_wms_qudaorealfenxi_view =
                "CREATE TABLE gc_wms_qudaorealfenxi_view (\n" +
                        "  uuid STRING,\n" +
                        "  datasource_num_id INT,\n" +
                        "  customer_code STRING,\n" +
                        "  order_code STRING,\n" +
                        "  add_time TIMESTAMP,\n" +
                        "  order_date_release TIMESTAMP,\n" +
                        "  import_time TIMESTAMP,\n" +
                        "  process_time TIMESTAMP,\n" +
                        "  pack_time TIMESTAMP,\n" +
                        "  ship_time TIMESTAMP,\n" +
                        "  tracking_number STRING,\n" +
                        "  warehouse_id INT,\n" +
                        "  warehouse_code STRING,\n" +
                        "  warehouse_desc STRING,\n" +
                        "  timezone INT,\n" +
                        "  sm_code STRING,\n" +
                        "  sm_name STRING,\n" +
                        "  sc_id INT,\n" +
                        "  server_channel_name STRING,\n" +
                        "  server_code STRING,\n" +
                        "  server_name STRING,\n" +
                        "  tms_order_code STRING,\n" +
                        "  wp_code STRING,\n" +
                        "  wp_name STRING,\n" +
                        "  is_fba INT,\n" +
                        "  sm_is_tracking_val STRING,\n" +
                        "  PRIMARY KEY (uuid) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'url' = 
'jdbc:mysql://zongteng75:3306/zongteng_streaming?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8',"
 +
                        " 'table-name' = 'gc_wms_qudaorealfenxi_view',\n" +
                        " 'driver' = 'com.mysql.jdbc.Driver',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'c4RKchNdOg#',\n" +
                        " 'sink.buffer-flush.interval' = '1s',\n" +
                        " 'sink.buffer-flush.max-rows' ='5'" +
                        ")";

        String create_table_gc_wms_qudaorealfenxi_view_print =
                "CREATE TABLE gc_wms_qudaorealfenxi_view1 (\n" +
                        "  uuid STRING,\n" +
                        "  datasource_num_id INT,\n" +
                        "  customer_code STRING,\n" +
                        "  order_code STRING,\n" +
                        "  add_time TIMESTAMP,\n" +
                        "  order_date_release TIMESTAMP,\n" +
                        "  import_time TIMESTAMP,\n" +
                        "  process_time TIMESTAMP,\n" +
                        "  pack_time TIMESTAMP,\n" +
                        "  ship_time TIMESTAMP,\n" +
                        "  tracking_number STRING,\n" +
                        "  warehouse_id INT,\n" +
                        "  warehouse_code STRING,\n" +
                        "  warehouse_desc STRING,\n" +
                        "  timezone INT,\n" +
                        "  sm_code STRING,\n" +
                        "  sm_name STRING,\n" +
                        "  sc_id INT,\n" +
                        "  server_channel_name STRING,\n" +
                        "  server_code STRING,\n" +
                        "  server_name STRING,\n" +
                        "  tms_order_code STRING,\n" +
                        "  wp_code STRING,\n" +
                        "  wp_name STRING,\n" +
                        "  is_fba INT,\n" +
                        "  sm_is_tracking_val STRING,\n" +
                        "  PRIMARY KEY (uuid) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'print'\n" +
                        ")";
//        gc_wms_orders | order_id
//        gc_wms_order_box | order_code box_no
//        gc_wms_ship_order | order_code
//        gc_wms_order_operation_time | oot_id
//        gc_wms_order_physical_relation | opr_id
//        gc_oms_orders | order_id

        String sql = "INSERT INTO gc_wms_qudaorealfenxi_view" +
                " SELECT " +
                "    CONCAT(" +
                "       CAST(gwo.order_id AS STRING), '-'\n" +
                "      ,IF (gwob.order_code IS NULL, 'gwob_code_1', 
gwob.order_code), '-'\n" +
                "      ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no), 
'-'\n" +
                "      ,IF (gwso.order_code IS NULL, 'gwso_code_1', 
gwso.order_code), '-'\n" +
                "      ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS 
STRING)), '-'\n" +
                "      ,IF (op.order_code IS NULL, 'op_code_1', CAST ( 
op.order_code AS STRING)), '-'\n" +
                "      ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id 
AS STRING))\n" +
                "    )\n" +
                "   ,9004 AS datasource_num_id\n" +
                "   ,gwo.customer_code AS customer_code\n" +
                "   ,gwo.order_code AS order_code\n" +
                "   ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" +
                "   ,oo.date_release AS order_date_release" +
                "   ,opt.import_time AS import_time" +
                "   ,opt.process_time AS process_time" +
                "   ,opt.pack_time AS pack_time" +
                "   ,opt.ship_time AS ship_time" +
                "   ,IF (gwob.tracking_number IS NOT NULL, 
gwob.tracking_number, gwso.tracking_number) AS tracking_number" +
                "   ,gwo.warehouse_id AS warehouse_id" +
                "   ,ware.warehouse_code AS warehouse_code" +
                "   ,ware.warehouse_desc AS warehouse_desc" +
                "   ,(case\n" +
                "        when (tz.timezone_season_type = 'summer_time') then\n" 
+
                "        (case\n" +
                "            when (gwo.add_time between 
tz.timezone_season_start and tz.timezone_season_end) then 
tz.timezone_summer_number else tz.timezone_winner_number\n" +
                "        end)\n" +
                "        when (tz.timezone_season_type = 'winner_time') then\n" 
+
                "        (case\n" +
                "            when (gwo.add_time between 
tz.timezone_season_start and tz.timezone_season_end) then 
tz.timezone_winner_number else tz.timezone_summer_number\n" +
                "        end)\n" +
                "    end) AS timezone\n" +
                "    ,gwo.sm_code AS sm_code\n" +
                "    ,sm.sm_name AS sm_name\n" +
                "    ,gwo.sc_id AS sc_id\n" +
                "    ,sc.server_channel_name AS server_channel_name\n" +
                "    ,s.server_code AS server_code\n" +
                "    ,s.server_name AS server_name\n" +
                "    ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '', 
concat('-', gwob.box_no))) AS tms_order_code\n" +
                "    ,op.wp_code AS wp_code\n" +
                "    ,wp.wp_name AS wp_name\n" +
                "    ,CAST (gwo.is_fba AS INT) AS is_fba\n" +
                "    ,sm.sm_is_tracking_val AS sm_is_tracking_val" +
                " FROM " + "gc_wms_orders" + " AS gwo\n" +
                " LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code = 
gwob.order_code\n" +
                " LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code = 
gwso.order_code\n" +
                " LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id 
AS INT) = ware.warehouse_id\n" +
                " LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT) 
= tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) = 
tz.timezone_year\n" +
                " LEFT JOIN (SELECT * FROM dim_shipping_method WHERE 
datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" +
                " LEFT JOIN (SELECT * FROM dim_server_channel WHERE 
datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) = 
sc.server_channel_id\n" +
                " LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id = 
9004) AS s ON sc.server_channel_server_id = s.server_id\n" +
                " LEFT JOIN gc_wms_order_operation_time opt ON CAST 
(gwo.order_id AS INT) = opt.order_id\n" +
                " LEFT JOIN ( SELECT " +
                "      gwo.order_code \n" +
                "     ,min(pr.wp_code) wp_code \n" +
                "    FROM " + "gc_wms_orders" + " AS gwo \n" +
                "    LEFT JOIN gc_wms_order_physical_relation AS pr ON 
gwo.order_code = pr.order_code \n" +
                "    WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10 
\n" +
                "    GROUP BY gwo.order_code" +
                " ) AS op \n" +
                " ON gwo.order_code = op.order_code\n" +
                " LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code = 
op.wp_code\n" +
                " LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform = 
gwo.order_code\n" +
                " WHERE 1 = 1\n" +
                " AND gwo.order_status <> 0\n" +
                " AND gwo.order_platform_type = 'sale'\n" +
                " AND gwo.sm_code <> 'ZITI'\n" +
                " AND gwo.warehouse_id not in (5,6)\n" +
                " AND gwo.customer_code not in 
('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n"
 +
                " AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE) 
<= 10 \n";

        String print_sql = "INSERT INTO gc_wms_qudaorealfenxi_view1" +
                " SELECT " +
                "    CONCAT(" +
                "       CAST(gwo.order_id AS STRING), '-'\n" +
                "      ,IF (gwob.order_code IS NULL, 'gwob_code_1', 
gwob.order_code), '-'\n" +
                "      ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no), 
'-'\n" +
                "      ,IF (gwso.order_code IS NULL, 'gwso_code_1', 
gwso.order_code), '-'\n" +
                "      ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS 
STRING)), '-'\n" +
                "      ,IF (op.order_code IS NULL, 'op_code_1', CAST ( 
op.order_code AS STRING)), '-'\n" +
                "      ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id 
AS STRING))\n" +
                "    )\n" +
                "   ,9004 AS datasource_num_id\n" +
                "   ,gwo.customer_code AS customer_code\n" +
                "   ,gwo.order_code AS order_code\n" +
                "   ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" +
                "   ,oo.date_release AS order_date_release" +
                "   ,opt.import_time AS import_time" +
                "   ,opt.process_time AS process_time" +
                "   ,opt.pack_time AS pack_time" +
                "   ,opt.ship_time AS ship_time" +
                "   ,IF (gwob.tracking_number IS NOT NULL, 
gwob.tracking_number, gwso.tracking_number) AS tracking_number" +
                "   ,gwo.warehouse_id AS warehouse_id" +
                "   ,ware.warehouse_code AS warehouse_code" +
                "   ,ware.warehouse_desc AS warehouse_desc" +
                "   ,(case\n" +
                "        when (tz.timezone_season_type = 'summer_time') then\n" 
+
                "        (case\n" +
                "            when (gwo.add_time between 
tz.timezone_season_start and tz.timezone_season_end) then 
tz.timezone_summer_number else tz.timezone_winner_number\n" +
                "        end)\n" +
                "        when (tz.timezone_season_type = 'winner_time') then\n" 
+
                "        (case\n" +
                "            when (gwo.add_time between 
tz.timezone_season_start and tz.timezone_season_end) then 
tz.timezone_winner_number else tz.timezone_summer_number\n" +
                "        end)\n" +
                "    end) AS timezone\n" +
                "    ,gwo.sm_code AS sm_code\n" +
                "    ,sm.sm_name AS sm_name\n" +
                "    ,gwo.sc_id AS sc_id\n" +
                "    ,sc.server_channel_name AS server_channel_name\n" +
                "    ,s.server_code AS server_code\n" +
                "    ,s.server_name AS server_name\n" +
                "    ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '', 
concat('-', gwob.box_no))) AS tms_order_code\n" +
                "    ,op.wp_code AS wp_code\n" +
                "    ,wp.wp_name AS wp_name\n" +
                "    ,CAST (gwo.is_fba AS INT) AS is_fba\n" +
                "    ,sm.sm_is_tracking_val AS sm_is_tracking_val" +
                " FROM " + "gc_wms_orders" + " AS gwo\n" +
                " LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code = 
gwob.order_code\n" +
                " LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code = 
gwso.order_code\n" +
                " LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id 
AS INT) = ware.warehouse_id\n" +
                " LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT) 
= tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) = 
tz.timezone_year\n" +
                " LEFT JOIN (SELECT * FROM dim_shipping_method WHERE 
datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" +
                " LEFT JOIN (SELECT * FROM dim_server_channel WHERE 
datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) = 
sc.server_channel_id\n" +
                " LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id = 
9004) AS s ON sc.server_channel_server_id = s.server_id\n" +
                " LEFT JOIN gc_wms_order_operation_time opt ON CAST 
(gwo.order_id AS INT) = opt.order_id\n" +
                " LEFT JOIN ( SELECT " +
                "      gwo.order_code \n" +
                "     ,min(pr.wp_code) wp_code \n" +
                "    FROM " + "gc_wms_orders" + " AS gwo \n" +
                "    LEFT JOIN gc_wms_order_physical_relation AS pr ON 
gwo.order_code = pr.order_code \n" +
                "    WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10 
\n" +
                "    GROUP BY gwo.order_code" +
                " ) AS op \n" +
                " ON gwo.order_code = op.order_code\n" +
                " LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code = 
op.wp_code\n" +
                " LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform = 
gwo.order_code\n" +
                " WHERE 1 = 1\n" +
                " AND gwo.order_status <> 0\n" +
                " AND gwo.order_platform_type = 'sale'\n" +
                " AND gwo.sm_code <> 'ZITI'\n" +
                " AND gwo.warehouse_id not in (5,6)\n" +
                " AND gwo.customer_code not in 
('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n"
 +
                " AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE) 
<= 10 \n";


        tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view);
        tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view_print); // 
调试

        StatementSet stmtSet = tEnv.createStatementSet();
        stmtSet.addInsertSql(sql);
        stmtSet.addInsertSql(print_sql); // 调试
        // 执行刚刚添加的所有 INSERT 语句
        stmtSet.execute();
    }

    /**
     * 创建纬度表
     */
    private static void createDimTable(StreamTableEnvironment tEnv) {

        // gc_wms_warehouse
        String create_table_gc_wms_warehouse = confJdbc("CREATE TABLE 
gc_wms_warehouse (\n" +
                "  warehouse_id INT,\n" +
                "  warehouse_code STRING,\n" +
                "  warehouse_type INT,\n" +
                "  warehouse_status INT,\n" +
                "  warehouse_virtual INT,\n" +
                "  warehouse_transfer INT,\n" +
                "  country_id INT,\n" +
                "  country_code STRING,\n" +
                "  state STRING,\n" +
                "  city STRING,\n" +
                "  contacter STRING,\n" +
                "  company STRING,\n" +
                "  phone_no STRING,\n" +
                "  street_address1 STRING,\n" +
                "  street_address2 STRING,\n" +
                "  postcode STRING,\n" +
                "  warehouse_desc STRING,\n" +
                "  warehouse_add_time TIMESTAMP,\n" +
                "  warehouse_update_time TIMESTAMP,\n" +
                "  warehouse_proxy_code STRING,\n" +
                "  street_number STRING,\n" +
                "  timezone INT,\n" +
                "  value_added_tax INT,\n" +
                "  warehouse_en STRING,\n" +
                "  saving_time INT,\n" +
                "  arrival_days INT,\n" +
                "  currency_code STRING,\n" +
                "  `day` STRING,\n" +
                "  PRIMARY KEY (warehouse_id) NOT ENFORCED\n" +
                ")  WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'gc_wms_warehouse',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        String create_table_dp_timezone = confJdbc("CREATE TABLE dp_timezone 
(\n" +
                "  warehouse_id INT,\n" +
                "  warehouse_code STRING,\n" +
                "  warehouse_name STRING,\n" +
                "  timezone_year INT,\n" +
                "  timezone_season_type STRING,\n" +
                "  timezone_season_start TIMESTAMP,\n" +
                "  timezone_season_end TIMESTAMP,\n" +
                "  timezone_summer_number INT,\n" +
                "  timezone_winner_number INT,\n" +
                "  timezone_summer_time_dif_val INT,\n" +
                "  timezone_winner_time_dif_val INT\n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'dp_timezone',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        String create_table_dim_shipping_method = confJdbc("CREATE TABLE 
dim_shipping_method (\n" +
                "  row_wid STRING,\n" +
                "  warehouse_key STRING,\n" +
                "  sc_key STRING,\n" +
                "  st_key STRING,\n" +
                "  sct_key STRING,\n" +
                "  sm_transfer_warehouse_key STRING,\n" +
                "  sm_id INT,\n" +
                "  sm_code STRING,\n" +
                "  sm_short_name STRING,\n" +
                "  sm_name_cn STRING,\n" +
                "  sm_name STRING,\n" +
                "  sm_pg_code STRING,\n" +
                "  sm_mp_fee DECIMAL(10,2),\n" +
                "  sm_reg_fee DECIMAL(10,2),\n" +
                "  sm_addons DECIMAL(10,2),\n" +
                "  sm_type INT,\n" +
                "  sm_type_val STRING,\n" +
                "  sm_baf DECIMAL(10,2),\n" +
                "  sm_discount DECIMAL(10,2),\n" +
                "  sm_delivery_time_min STRING,\n" +
                "  sm_delivery_time_max STRING,\n" +
                "  sm_delivery_time_avg STRING,\n" +
                "  sm_is_volume INT,\n" +
                "  sm_is_volume_val STRING,\n" +
                "  sm_is_charge INT,\n" +
                "  sm_is_charge_val STRING,\n" +
                "  sm_vol_rate DECIMAL(15,4),\n" +
                "  sm_status INT,\n" +
                "  sm_status_val STRING,\n" +
                "  sm_class_code STRING,\n" +
                "  sm_class_code_val STRING,\n" +
                "  sm_logo STRING,\n" +
                "  sm_return_recipient STRING,\n" +
                "  sm_return_address STRING,\n" +
                "  sm_discount_min DECIMAL(10,2),\n" +
                "  sm_mp_fee_min DECIMAL(10,2),\n" +
                "  sm_reg_fee_min DECIMAL(10,2),\n" +
                "  sm_limit_volume INT,\n" +
                "  sm_limit_weight DECIMAL(10,3),\n" +
                "  sm_sort INT,\n" +
                "  sm_is_tracking INT,\n" +
                "  sm_is_tracking_val STRING,\n" +
                "  sm_is_signature INT,\n" +
                "  sm_is_signature_val STRING,\n" +
                "  sm_is_insurance INT,\n" +
                "  sm_is_insurance_val STRING,\n" +
                "  sm_is_validate_remote INT,\n" +
                "  sm_is_validate_remote_val STRING,\n" +
                "  sm_warehouse_id INT,\n" +
                "  sm_calc_type INT,\n" +
                "  sm_fee_type INT,\n" +
                "  sm_sc_id INT,\n" +
                "  sm_st_id INT,\n" +
                "  sm_carrier_number STRING,\n" +
                "  sm_allow_letter INT,\n" +
                "  sm_allow_letter_val STRING,\n" +
                "  sm_sct_id INT,\n" +
                "  sm_update_time TIMESTAMP,\n" +
                "  sm_pre_generate_label INT,\n" +
                "  sm_pre_generate_label_val STRING,\n" +
                "  sm_code_type INT,\n" +
                "  sm_code_type_val STRING,\n" +
                "  sm_is_more_box INT,\n" +
                "  sm_is_more_box_val STRING,\n" +
                "  sm_return_type INT,\n" +
                "  sm_attribute STRING,\n" +
                "  sm_attribute_val STRING,\n" +
                "  sm_is_claim INT,\n" +
                "  sm_is_claim_val STRING,\n" +
                "  sm_transfer_warehouse_id INT,\n" +
                "  sm_age_detection INT,\n" +
                "  sm_age_detection_val STRING,\n" +
                "  sm_recommend_platform STRING,\n" +
                "  sm_volume_unit STRING,\n" +
                "  sm_volume_type INT,\n" +
                "  sm_volume_type_val STRING,\n" +
                "  sm_volume_value DECIMAL(10,2),\n" +
                "  sm_weight_unit STRING,\n" +
                "  sm_weight_type INT,\n" +
                "  sm_weight_type_val STRING,\n" +
                "  sm_weight_value DECIMAL(10,2),\n" +
                "  sm_claim_desc STRING,\n" +
                "  sm_sfp_label_type INT,\n" +
                "  sm_sfp_label_type_val STRING,\n" +
                "  sm_life_gate INT,\n" +
                "  sm_life_gate_val STRING,\n" +
                "  sm_truck INT,\n" +
                "  sm_truck_val STRING,\n" +
                "  datasource_num_id STRING,\n" +
                "  data_flag STRING,\n" +
                "  w_insert_dt TIMESTAMP,\n" +
                "  PRIMARY KEY (row_wid) NOT ENFORCED\n" +
                ")  WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'dim_shipping_method',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        // dim_server_channel
        String create_table_dim_server_channel = confJdbc("CREATE TABLE 
dim_server_channel (\n" +
                "  row_wid STRING,\n" +
                "  server_key STRING,\n" +
                "  server_channel_id INT,\n" +
                "  server_channel_server_id INT,\n" +
                "  server_channel_code STRING,\n" +
                "  server_channel_short_name STRING,\n" +
                "  server_channel_name STRING,\n" +
                "  server_channel_name_en STRING,\n" +
                "  server_channel_note STRING,\n" +
                "  server_channel_status INT,\n" +
                "  server_channel_status_val STRING,\n" +
                "  server_channel_is_weighing INT,\n" +
                "  server_channel_is_weighing_val STRING,\n" +
                "  server_channel_is_tracking INT,\n" +
                "  server_channel_is_tracking_val STRING,\n" +
                "  server_channel_is_volume INT,\n" +
                "  server_channel_is_volume_val STRING,\n" +
                "  server_channel_st_id INT,\n" +
                "  server_channel_st_auto_delivery INT,\n" +
                "  server_channel_st_split_start INT,\n" +
                "  server_channel_st_split_value INT,\n" +
                "  server_channel_st_split_type STRING,\n" +
                "  server_channel_add_time TIMESTAMP,\n" +
                "  server_channel_update_time TIMESTAMP,\n" +
                "  server_channel_asp_id INT,\n" +
                "  server_channel_asp_code STRING,\n" +
                "  server_channel_apm_id INT,\n" +
                "  server_channel_api_pk_type STRING,\n" +
                "  server_channel_wut_code STRING,\n" +
                "  server_channel_min_weight DECIMAL(11,3),\n" +
                "  server_channel_max_weight DECIMAL(11,3),\n" +
                "  server_channel_is_validate_address INT,\n" +
                "  server_channel_is_validate_address_val STRING,\n" +
                "  server_channel_is_pack_weighing INT,\n" +
                "  server_channel_is_pack_weighing_val STRING,\n" +
                "  server_channel_sign_type INT,\n" +
                "  server_channel_right_trim_length INT,\n" +
                "  server_channel_left_trim_length INT,\n" +
                "  server_channel_is_mark_weight INT,\n" +
                "  server_channel_is_mark_weight_val STRING,\n" +
                "  server_channel_push_tms_status INT,\n" +
                "  datasource_num_id STRING,\n" +
                "  data_flag STRING,\n" +
                "  w_insert_dt TIMESTAMP,\n" +
                "  PRIMARY KEY (row_wid) NOT ENFORCED\n" +
                ")  WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'dim_server_channel',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        // dim_server
        String create_table_dim_server = confJdbc("CREATE TABLE dim_server (\n" 
+
                "  row_wid STRING,\n" +
                "  server_id INT,\n" +
                "  server_code STRING,\n" +
                "  server_name STRING,\n" +
                "  server_contact_name STRING,\n" +
                "  server_contact_phone STRING,\n" +
                "  server_address STRING,\n" +
                "  server_settlement_type INT,\n" +
                "  server_settlement_type_val STRING,\n" +
                "  server_balance DECIMAL(10,3),\n" +
                "  server_currency_code STRING,\n" +
                "  server_tax DECIMAL(10,3),\n" +
                "  server_person_user INT,\n" +
                "  server_update_date TIMESTAMP,\n" +
                "  server_type INT,\n" +
                "  server_is_ignore_express INT,\n" +
                "  server_as_id INT,\n" +
                "  server_as_name STRING,\n" +
                "  server_add_date TIMESTAMP,\n" +
                "  server_status INT,\n" +
                "  server_status_val STRING,\n" +
                "  server_ap_id INT,\n" +
                "  server_carrier_name STRING,\n" +
                "  server_cancel_label_flag INT,\n" +
                "  server_is_cancel_label_flag_val STRING,\n" +
                "  datasource_num_id STRING,\n" +
                "  data_flag STRING,\n" +
                "  w_insert_dt TIMESTAMP,\n" +
                "  PRIMARY KEY (row_wid) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'dim_server',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        // gc_wms_warehouse_physical
        String create_table_gc_wms_warehouse_physical = confJdbc("CREATE TABLE 
gc_wms_warehouse_physical (\n" +
                "  wp_id BIGINT,\n" +
                "  warehouse_id INT,\n" +
                "  wp_name STRING,\n" +
                "  wp_code STRING,\n" +
                "  wp_status INT,\n" +
                "  wp_priority INT,\n" +
                "  wp_state STRING,\n" +
                "  wp_city STRING,\n" +
                "  wp_postcode STRING,\n" +
                "  wp_company STRING,\n" +
                "  wp_contacter STRING,\n" +
                "  wp_phone STRING,\n" +
                "  wp_street_address1 STRING,\n" +
                "  wp_street_address2 STRING,\n" +
                "  wp_street_number STRING,\n" +
                "  wp_add_time TIMESTAMP,\n" +
                "  wp_update_time TIMESTAMP,\n" +
                "  `day` STRING,\n" +
                "  PRIMARY KEY (wp_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = '%s',\n" +
                " 'url' = '%s',\n" +
                " 'table-name' = 'gc_wms_warehouse_physical',\n" +
                " 'driver' = '%s',\n" +
                " 'username' = '%s',\n" +
                " 'password' = '%s',\n" +
                " 'sink.buffer-flush.interval' = '1s',\n" +
                " 'sink.buffer-flush.max-rows' = '5'\n" +
                ")");

        tEnv.executeSql(create_table_gc_wms_warehouse);
        tEnv.executeSql(create_table_dp_timezone);
        tEnv.executeSql(create_table_dim_shipping_method);
        tEnv.executeSql(create_table_dim_server_channel);
        tEnv.executeSql(create_table_dim_server);
        tEnv.executeSql(create_table_gc_wms_warehouse_physical);
    }

    /**
     * 配置jdbc属性
     * @param tableSql
     */
    private static String confJdbc(String tableSql) {

        Properties properties = 
PropertyUtil.loadProperties("druid_jdbc/zongteng_streaming.properties");
        String connector = "jdbc";
        String url = properties.getProperty("url");
        String driver = properties.getProperty("driverClassName");
        String username = properties.getProperty("username");
        String password = properties.getProperty("password");

        return  String.format(tableSql, connector, url, driver, username, 
password);
    }

}

回复