xueyongyang created FLINK-32011: ----------------------------------- Summary: flink1.15.2 loaded all the data in the table in mysql5.7 Key: FLINK-32011 URL: https://issues.apache.org/jira/browse/FLINK-32011 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.15.2 Environment: flink1.15.2
mysql5.7 Reporter: xueyongyang Fix For: 1.15.2 CREATE TABLE `T_P_FILTER_MERCHANT_DAY_RES2` ( `MERCHANT_NO` varchar(200) NOT NULL, `ACT_ID` varchar(50) NOT NULL, `RULE_ID` varchar(50) NOT NULL, `SUM_MONEY` decimal(25,5) DEFAULT NULL, `SUM_NUM` decimal(25,5) DEFAULT NULL, `DATE_DT` int NOT NULL, `DATE_TYPE` varchar(50) DEFAULT NULL, `BEGIN_DATE` int DEFAULT NULL, `END_DATE` int DEFAULT NULL, `ID` bigint NOT NULL AUTO_INCREMENT, PRIMARY KEY (`ID`), UNIQUE KEY `T_P_FILTER_MERCHANT_DAY_RES2_UN` (`MERCHANT_NO`,`ACT_ID`,`RULE_ID`,`DATE_DT`) ) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `T_P_RED_STAND_RESULT` ( `ACTIVITY_NO` varchar(50) , `RULE_ID` varchar(50) , `WEIGHT` int NOT NULL , `DEAL_TYPE` varchar(50) , `DATA_TYPE` varchar(50) , `DATA_NO` varchar(50) , `USER_TYPE` varchar(50) , `USER_NO` varchar(50) , `SUM_MONEY` decimal(25,5) , `SUM_NUM` decimal(25,5) , `DATE_TYPE` varchar(50) , `BEGIN_TIME` int NOT NULL , `END_TIME` int NOT NULL , `TRADE_MONEY` decimal(25,5) DEFAULT NULL, `EXTEND_DATA_MEAN` varchar(50) , `DATA1` varchar(50) , `DATA2` varchar(50) , `DATA3` varchar(50) , `DATA4` varchar(50) , `DATA5` varchar(50) , `CK_REACH` varchar(10) , `READ_STATUS` varchar(10) , `PROCESS_STATUS` varchar(50) , `PROCESS_STATUS_DESC` varchar(200) , `RESULT_TYPE` varchar(50) , `MANAGER_NO` varchar(50) , `MANAGER_ORG_NO` varchar(50) , `CALCULATION_DATE` int NOT NULL , `CALCULATION_TIME` datetime DEFAULT NULL , `CREATE_TIME` datetime DEFAULT NULL , `CREATE_USER_NO` varchar(50) , `MAINTENANCE_TIME` varchar(50) , `MAINTENANCE_USER_NO` varchar(50) , `ID` bigint NOT NULL AUTO_INCREMENT, PRIMARY KEY (`ID`), UNIQUE KEY `T_P_RED_STAND_RESULT_UN` (`ACTIVITY_NO`,`USER_NO`,`BEGIN_TIME`,`END_TIME`) ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; INSERT INTO T_P_RED_STAND_RESULT ( ACTIVITY_NO, RULE_ID, WEIGHT, DEAL_TYPE, DATA_TYPE, DATA_NO, USER_TYPE, USER_NO, SUM_MONEY, SUM_NUM, DATE_TYPE, BEGIN_TIME, END_TIME, TRADE_MONEY, EXTEND_DATA_MEAN, DATA1, DATA2, DATA3, DATA4, DATA5, CK_REACH, READ_STATUS, PROCESS_STATUS, PROCESS_STATUS_DESC, RESULT_TYPE, MANAGER_NO, MANAGER_ORG_NO, CALCULATION_DATE, CALCULATION_TIME, CREATE_TIME, CREATE_USER_NO, MAINTENANCE_TIME, MAINTENANCE_USER_NO) select 'abc' as ACTIVITY_NO, 'def' as RULE_ID, 1 as WEIGHT, 'red' as DEAL_TYPE, 'gear' as DATA_TYPE, '001010102' as DATA_NO, 'merchantRed' as USER_TYPE, r.MERCHANT_NO as MERCHANT_NO, r.SUM_MONEY as SUM_MONEY, r.SUM_NUM as SUM_NUM , r.DATE_TYPE as DATE_TYPE , r.BEGIN_DATE as BEGIN_DATE, r.END_DATE as END_DATE, 0 as TRADE_MONEY, 'other' as EXTEND_DATA_MEAN, if(0.1 * r.SUM_MONEY >= 10 and 0.1 * r.SUM_MONEY <= 100, 0.1 * r.SUM_MONEY, if(0.1 * r.SUM_MONEY<10, 10, if(0.1 * r.SUM_MONEY>100, 100, 0.1 * r.SUM_MONEY))) as DATA1, '' as DATA2, '' as DATA3, '' as DATA4, '' as DATA5, '1' as CK_REACH, '0' as READ_STATUS, '' as PROCESS_STATUS, '' as PROCESS_STATUS_DESC, 'flink-batch' as RESULT_TYPE, '' as MANAGER_NO, '' as MANAGER_ORG_NO, r.DATE_DT as CALCULATION_DATE, LOCALTIMESTAMP as CALCULATION_TIME, LOCALTIMESTAMP as CREATE_TIME, 'system' as CREATE_USER_NO, '' as MAINTENANCE_TIME, '' as MAINTENANCE_USER_NO from T_P_FILTER_MERCHANT_OTHER_DATE_RES2 r where 1 = 1 and r.ACT_ID = 'abc' and r.RULE_ID = 'def' and r.DATE_DT = 20221028 and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u where u.ACTIVITY_NO = 'abc' and u.CALCULATION_DATE = 20221028 and u.WEIGHT>=1) and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u where u.ACTIVITY_NO = 'abc' and u.READ_STATUS = '1') We found that when executing the above flink sql, we feel that flink has loaded all the data in T_P_FILTER_MERCHANT_DAY_RES2 into the memory, and then converted the where statement into a java filter condition, and fetched the filtered data in the memory. We have this judgment for two reasons 1. The error message of flink is oom, memory overflow 2. The feedback from the dba said that we have done a full table query of the T_P_FILTER_MERCHANT_DAY_RES2 table -- This message was sent by Atlassian Jira (v8.20.10#820010)