xueyongyang created FLINK-32011:
-----------------------------------

             Summary: flink1.15.2 loaded all the data in the table in mysql5.7
                 Key: FLINK-32011
                 URL: https://issues.apache.org/jira/browse/FLINK-32011
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / JDBC
    Affects Versions: 1.15.2
         Environment: flink1.15.2

mysql5.7
            Reporter: xueyongyang
             Fix For: 1.15.2


CREATE TABLE `T_P_FILTER_MERCHANT_DAY_RES2` (
  `MERCHANT_NO` varchar(200) NOT NULL,
  `ACT_ID` varchar(50) NOT NULL,
  `RULE_ID` varchar(50) NOT NULL,
  `SUM_MONEY` decimal(25,5) DEFAULT NULL,
  `SUM_NUM` decimal(25,5) DEFAULT NULL,
  `DATE_DT` int NOT NULL,
  `DATE_TYPE` varchar(50) DEFAULT NULL,
  `BEGIN_DATE` int DEFAULT NULL,
  `END_DATE` int DEFAULT NULL,
  `ID` bigint NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`ID`),
  UNIQUE KEY `T_P_FILTER_MERCHANT_DAY_RES2_UN` 
(`MERCHANT_NO`,`ACT_ID`,`RULE_ID`,`DATE_DT`)
) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci;

 

CREATE TABLE `T_P_RED_STAND_RESULT` (
  `ACTIVITY_NO` varchar(50)  ,
  `RULE_ID` varchar(50)    ,
  `WEIGHT` int NOT NULL ,
  `DEAL_TYPE` varchar(50)  ,
  `DATA_TYPE` varchar(50)  ,
  `DATA_NO` varchar(50) ,
  `USER_TYPE` varchar(50)  ,
  `USER_NO` varchar(50)  ,
  `SUM_MONEY` decimal(25,5) ,
  `SUM_NUM` decimal(25,5) ,
  `DATE_TYPE` varchar(50)  ,
  `BEGIN_TIME` int NOT NULL ,
  `END_TIME` int NOT NULL ,
  `TRADE_MONEY` decimal(25,5) DEFAULT NULL,
  `EXTEND_DATA_MEAN` varchar(50)  ,
  `DATA1` varchar(50)  ,
  `DATA2` varchar(50)  ,
  `DATA3` varchar(50)  ,
  `DATA4` varchar(50)  ,
  `DATA5` varchar(50)  ,
  `CK_REACH` varchar(10)  ,
  `READ_STATUS` varchar(10)  ,
  `PROCESS_STATUS` varchar(50)  ,
  `PROCESS_STATUS_DESC` varchar(200)  ,
  `RESULT_TYPE` varchar(50) ,
  `MANAGER_NO` varchar(50) ,
  `MANAGER_ORG_NO` varchar(50)  ,
  `CALCULATION_DATE` int NOT NULL ,
  `CALCULATION_TIME` datetime DEFAULT NULL ,
  `CREATE_TIME` datetime DEFAULT NULL ,
  `CREATE_USER_NO` varchar(50)  ,
  `MAINTENANCE_TIME` varchar(50)  ,
  `MAINTENANCE_USER_NO` varchar(50)  ,
  `ID` bigint NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`ID`),
  UNIQUE KEY `T_P_RED_STAND_RESULT_UN` 
(`ACTIVITY_NO`,`USER_NO`,`BEGIN_TIME`,`END_TIME`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci;

 

INSERT INTO
    T_P_RED_STAND_RESULT ( ACTIVITY_NO,
    RULE_ID,
    WEIGHT,
    DEAL_TYPE,
    DATA_TYPE,
    DATA_NO,
    USER_TYPE,
    USER_NO,
    SUM_MONEY,
    SUM_NUM,
    DATE_TYPE,
    BEGIN_TIME,
    END_TIME,
    TRADE_MONEY,
    EXTEND_DATA_MEAN,
    DATA1,
    DATA2,
    DATA3,
    DATA4,
    DATA5,
    CK_REACH,
    READ_STATUS,
    PROCESS_STATUS,
    PROCESS_STATUS_DESC,
    RESULT_TYPE,
    MANAGER_NO,
    MANAGER_ORG_NO,
    CALCULATION_DATE,
    CALCULATION_TIME,
    CREATE_TIME,
    CREATE_USER_NO,
    MAINTENANCE_TIME,
    MAINTENANCE_USER_NO)  
select
    'abc' as ACTIVITY_NO,
    'def' as RULE_ID,
    1 as WEIGHT,
    'red' as DEAL_TYPE,
    'gear' as DATA_TYPE,
    '001010102' as DATA_NO,
    'merchantRed' as USER_TYPE,
    r.MERCHANT_NO as MERCHANT_NO,
    r.SUM_MONEY as SUM_MONEY,
    r.SUM_NUM as SUM_NUM ,
    r.DATE_TYPE as DATE_TYPE ,
    r.BEGIN_DATE as BEGIN_DATE,
    r.END_DATE as END_DATE,
    0 as TRADE_MONEY,
    'other' as EXTEND_DATA_MEAN,
    if(0.1 * r.SUM_MONEY >= 10 and 0.1 * r.SUM_MONEY <= 100, 0.1 * r.SUM_MONEY, 
if(0.1 * r.SUM_MONEY<10, 10, if(0.1 * r.SUM_MONEY>100, 100, 0.1 * 
r.SUM_MONEY))) as DATA1,
    '' as DATA2,
    '' as DATA3,
    '' as DATA4,
    '' as DATA5,
    '1' as CK_REACH,
    '0' as READ_STATUS,
    '' as PROCESS_STATUS,
    '' as PROCESS_STATUS_DESC,
    'flink-batch' as RESULT_TYPE,
    '' as MANAGER_NO,
    '' as MANAGER_ORG_NO,
    r.DATE_DT as CALCULATION_DATE,
    LOCALTIMESTAMP as CALCULATION_TIME,
    LOCALTIMESTAMP as CREATE_TIME,
    'system' as CREATE_USER_NO,
    '' as MAINTENANCE_TIME,
    '' as MAINTENANCE_USER_NO
from
    T_P_FILTER_MERCHANT_OTHER_DATE_RES2 r
where
    1 = 1
    and r.ACT_ID = 'abc'
    and r.RULE_ID = 'def'
    and r.DATE_DT = 20221028
    and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u 
where u.ACTIVITY_NO = 'abc' and u.CALCULATION_DATE = 20221028 and u.WEIGHT>=1)
    and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u 
where u.ACTIVITY_NO = 'abc' and u.READ_STATUS = '1')

 

We found that when executing the above flink sql, we feel that flink has loaded 
all the data in T_P_FILTER_MERCHANT_DAY_RES2 into the memory, and then 
converted the where statement into a java filter condition, and fetched the 
filtered data in the memory. We have this judgment for two reasons
1. The error message of flink is oom, memory overflow
2. The feedback from the dba said that we have done a full table query of the 
T_P_FILTER_MERCHANT_DAY_RES2 table

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to