你好,我这边使用flink sql实现四条流的关联,后续实现case when的逻辑,并且将数据插入到mysql,但是从结果数据来看,数据存在部分丢失,代码我粘贴再后面,麻烦各位老师指导,下面是sql【create function get_json_value as 'com.nesc.flink.udf.GetJsonValue'; set 'table.exec.sink.not-null-enforcer'='drop'; ----测试环境 CREATE TABLE dm_cust_oact_prog_ri ( cust_id STRING COMMENT '客户id' ,cust_nme STRING COMMENT '客户姓名' ,cust_mob_tel STRING COMMENT '客户手机号' ,cust_curr_step STRING COMMENT '客户当前步骤' ,cust_curr_step_num INT COMMENT '客户当前步骤数字' ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间' ,user_id STRING COMMENT '开户时使用的user_id' ,tech_sys_time STRING COMMENT '技术字段,更新时间' ,primary key (user_id,cust_curr_step) not enforced ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://111/iap' ,'username' = 'db_iap' ,'password' = '加密内容2' ,'table-name' = 'dm_cust_oact_prog_ri' ); CREATE TABLE dm_cust_oact_prog_ri_print ( cust_id STRING COMMENT '客户id' ,cust_nme STRING COMMENT '客户姓名' ,cust_mob_tel STRING COMMENT '客户手机号' ,cust_curr_step STRING COMMENT '客户当前步骤' ,cust_curr_step_num INT COMMENT '客户当前步骤数字' ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间' ,user_id STRING COMMENT '开户时使用的user_id' ,tech_sys_time STRING COMMENT '技术字段,更新时间' ,primary key (user_id,cust_curr_step) not enforced ) WITH ( 'connector' = 'print' ); CREATE TABLE dm_crh_cust_oact_rec_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,curr_datetime as get_json_value(`after`,'CURR_DATETIME') ,user_id as get_json_value(`after`,'USER_ID') ,request_no as get_json_value(`after`,'REQUEST_NO') ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'BUSINFLOWRECORD', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_cust_info_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,client_name as get_json_value(`after`,'CLIENT_NAME') ,request_status as get_json_value(`after`,'REQUEST_STATUS') ,mobile_tel as get_json_value(`after`,'MOBILE_TEL') ,user_id as get_json_value(`after`,'USER_ID') ,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME') ,channel_code as get_json_value(`after`,'CHANNEL_CODE') ,broker_code as get_json_value(`after`,'BROKER_CODE') ,user_gender as get_json_value(`after`,'USER_GENDER') ,birthday as get_json_value(`after`,'BIRTHDAY') ,client_id as get_json_value(`after`,'CLIENT_ID') ) WITH ( 'connector' = 'kafka', 'topic' = 'USERQUERYEXTINFO', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_audit_rec_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,curr_datetime as get_json_value(`after`,'CURR_DATETIME') ,request_no as get_json_value(`after`,'REQUEST_NO') ) WITH ( 'connector' = 'kafka', 'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_user_vidro_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,create_datetime as get_json_value(`after`,'CREATE_DATETIME') ,user_id as get_json_value(`after`,'USER_ID') ,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR') ) WITH ( 'connector' = 'kafka', 'topic' = 'CRH_USER.USERVIDEOFLOW', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); create view v_dm_cust_oact_prog_ri as select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time from ( select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn from ( select t2.client_id as cust_id ,t2.client_name as cust_nme ,t2.mobile_tel as cust_mob_tel ,case when t2.active_datetime is not null then '开户成功' when t5.business_flag_audit in ('1003','1011') then '人工审核' when t1.business_flag = '22114' then '提交申请' when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证' when t1.business_flag = '22113' then '绑定三方存管' when t1.business_flag = '22112' then '设置密码' when t1.business_flag = '22109' then '协议签署' when t1.business_flag = '22108' then '开通账户选择' when t1.business_flag = '22110' then '风险评测' when t1.business_flag = '22106' then '填写基本资料' when t1.business_flag = '22107' then '上传身份证' when t1.business_flag = '22111' then '选择营业部' when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step ,case when t2.active_datetime is not null then 13 when t5.business_flag_audit in ('1003','1011') then 12 when t1.business_flag = '22114' then 11 when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10 when t1.business_flag = '22113' then 9 when t1.business_flag = '22112' then 8 when t1.business_flag = '22109' then 7 when t1.business_flag = '22108' then 6 when t1.business_flag = '22110' then 5 when t1.business_flag = '22106' then 4 when t1.business_flag = '22107' then 3 when t1.business_flag = '22111' then 2 when t1.business_flag = '12100' then 1 end as cust_curr_step_num ,case when t2.active_datetime is not null then t2.active_datetime when t5.business_flag_audit is not null then t5.curr_datetime when t5.business_flag_video is not null then t5.create_datetime else t1.curr_datetime end as cust_curr_step_occu_tm ,t1.user_id from ( select curr_datetime ,user_id ,business_flag from ( select replace(curr_datetime,'-','') as curr_datetime ,user_id ,business_flag ,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn from dm_crh_cust_oact_rec_ri where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500') )t where rn = 1 ) t1 left join ( select user_id ,client_name ,mobile_tel ,replace(substr(active_datetime,1,19),'-','') as active_datetime ,client_id from dm_crh_cust_info_ri ) t2 on t1.user_id = t2.user_id left join ( select t1.user_id ,t1.join_position_str ,replace(t1.create_datetime,'-','') as create_datetime ,t1.business_flag AS business_flag_video ,t2.business_flag AS business_flag_audit ,replace(t2.curr_datetime,'-','') as curr_datetime from dm_crh_user_vidro_ri t1 left join dm_crh_audit_rec_ri t2 on t1.join_position_str = t2.request_no where t1.business_flag in ('1200','1202','1203') or t2.business_flag in ('1003','1011') ) t5 on t1.user_id = t5.user_id ) t where cust_curr_step is not null ) t where rn = 1 ; insert into dm_cust_oact_prog_ri_print select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ri ; insert into dm_cust_oact_prog_ri select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ricreate function get_json_value as 'com.nesc.flink.udf.GetJsonValue'; set 'table.exec.sink.not-null-enforcer'='drop'; ----测试环境 CREATE TABLE dm_cust_oact_prog_ri ( cust_id STRING COMMENT '客户id' ,cust_nme STRING COMMENT '客户姓名' ,cust_mob_tel STRING COMMENT '客户手机号' ,cust_curr_step STRING COMMENT '客户当前步骤' ,cust_curr_step_num INT COMMENT '客户当前步骤数字' ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间' ,user_id STRING COMMENT '开户时使用的user_id' ,tech_sys_time STRING COMMENT '技术字段,更新时间' ,primary key (user_id,cust_curr_step) not enforced ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://111/iap' ,'username' = 'db_iap' ,'password' = '加密内容2' ,'table-name' = 'dm_cust_oact_prog_ri' ); CREATE TABLE dm_cust_oact_prog_ri_print ( cust_id STRING COMMENT '客户id' ,cust_nme STRING COMMENT '客户姓名' ,cust_mob_tel STRING COMMENT '客户手机号' ,cust_curr_step STRING COMMENT '客户当前步骤' ,cust_curr_step_num INT COMMENT '客户当前步骤数字' ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间' ,user_id STRING COMMENT '开户时使用的user_id' ,tech_sys_time STRING COMMENT '技术字段,更新时间' ,primary key (user_id,cust_curr_step) not enforced ) WITH ( 'connector' = 'print' ); CREATE TABLE dm_crh_cust_oact_rec_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,curr_datetime as get_json_value(`after`,'CURR_DATETIME') ,user_id as get_json_value(`after`,'USER_ID') ,request_no as get_json_value(`after`,'REQUEST_NO') ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'BUSINFLOWRECORD', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_cust_info_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,client_name as get_json_value(`after`,'CLIENT_NAME') ,request_status as get_json_value(`after`,'REQUEST_STATUS') ,mobile_tel as get_json_value(`after`,'MOBILE_TEL') ,user_id as get_json_value(`after`,'USER_ID') ,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME') ,channel_code as get_json_value(`after`,'CHANNEL_CODE') ,broker_code as get_json_value(`after`,'BROKER_CODE') ,user_gender as get_json_value(`after`,'USER_GENDER') ,birthday as get_json_value(`after`,'BIRTHDAY') ,client_id as get_json_value(`after`,'CLIENT_ID') ) WITH ( 'connector' = 'kafka', 'topic' = 'USERQUERYEXTINFO', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_audit_rec_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,curr_datetime as get_json_value(`after`,'CURR_DATETIME') ,request_no as get_json_value(`after`,'REQUEST_NO') ) WITH ( 'connector' = 'kafka', 'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); CREATE TABLE dm_crh_user_vidro_ri ( op_type string ,op_ts string ,`after` string ,current_ts string ,business_flag as get_json_value(`after`,'BUSINESS_FLAG') ,create_datetime as get_json_value(`after`,'CREATE_DATETIME') ,user_id as get_json_value(`after`,'USER_ID') ,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR') ) WITH ( 'connector' = 'kafka', 'topic' = 'CRH_USER.USERVIDEOFLOW', 'properties.bootstrap.servers' = '111', 'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', --'csv.field-delimiter' = ',' --'scan.startup.mode' = 'timestamp', --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点 'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); create view v_dm_cust_oact_prog_ri as select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time from ( select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn from ( select t2.client_id as cust_id ,t2.client_name as cust_nme ,t2.mobile_tel as cust_mob_tel ,case when t2.active_datetime is not null then '开户成功' when t5.business_flag_audit in ('1003','1011') then '人工审核' when t1.business_flag = '22114' then '提交申请' when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证' when t1.business_flag = '22113' then '绑定三方存管' when t1.business_flag = '22112' then '设置密码' when t1.business_flag = '22109' then '协议签署' when t1.business_flag = '22108' then '开通账户选择' when t1.business_flag = '22110' then '风险评测' when t1.business_flag = '22106' then '填写基本资料' when t1.business_flag = '22107' then '上传身份证' when t1.business_flag = '22111' then '选择营业部' when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step ,case when t2.active_datetime is not null then 13 when t5.business_flag_audit in ('1003','1011') then 12 when t1.business_flag = '22114' then 11 when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10 when t1.business_flag = '22113' then 9 when t1.business_flag = '22112' then 8 when t1.business_flag = '22109' then 7 when t1.business_flag = '22108' then 6 when t1.business_flag = '22110' then 5 when t1.business_flag = '22106' then 4 when t1.business_flag = '22107' then 3 when t1.business_flag = '22111' then 2 when t1.business_flag = '12100' then 1 end as cust_curr_step_num ,case when t2.active_datetime is not null then t2.active_datetime when t5.business_flag_audit is not null then t5.curr_datetime when t5.business_flag_video is not null then t5.create_datetime else t1.curr_datetime end as cust_curr_step_occu_tm ,t1.user_id from ( select curr_datetime ,user_id ,business_flag from ( select replace(curr_datetime,'-','') as curr_datetime ,user_id ,business_flag ,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn from dm_crh_cust_oact_rec_ri where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500') )t where rn = 1 ) t1 left join ( select user_id ,client_name ,mobile_tel ,replace(substr(active_datetime,1,19),'-','') as active_datetime ,client_id from dm_crh_cust_info_ri ) t2 on t1.user_id = t2.user_id left join ( select t1.user_id ,t1.join_position_str ,replace(t1.create_datetime,'-','') as create_datetime ,t1.business_flag AS business_flag_video ,t2.business_flag AS business_flag_audit ,replace(t2.curr_datetime,'-','') as curr_datetime from dm_crh_user_vidro_ri t1 left join dm_crh_audit_rec_ri t2 on t1.join_position_str = t2.request_no where t1.business_flag in ('1200','1202','1203') or t2.business_flag in ('1003','1011') ) t5 on t1.user_id = t5.user_id ) t where cust_curr_step is not null ) t where rn = 1 ; insert into dm_cust_oact_prog_ri_print select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ri ; insert into dm_cust_oact_prog_ri select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ri】 | | 小昌同学 | | ccc0606fight...@163.com |