语句看起来是没有问题的,可以检查下数据是否能关联上 

Best,
Terry Wang



> 2021年7月6日 下午3:24,赵旭晨 <jjrr...@163.com> 写道:
> 
> 流表:
> CREATE TABLE flink_doris_source (
>     reporttime STRING,
>     tenantcode STRING,
>     visitid STRING,
>     orderid STRING,
>       orderdetailid STRING,
>       paymentdetailid STRING,
>       etltype STRING,
>       ptime as proctime()
> ) WITH (
>  'connector' = 'kafka',  
>  'topic' = 'demo', 
>  'properties.bootstrap.servers' = '10.26.255.82:9092', 
>  'properties.group.id' = 'consumer-55',  
>  'format' = 'json',  
>  'scan.startup.mode' = 'latest-offset'  
> );
> 
> 维表:
> CREATE TABLE people (
>   `Id` int,
>   `Name` String,
>   `Sex` tinyint,
>   `Birth` timestamp,
>   `Etltype` String,
>   PRIMARY KEY (Id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
>    'table-name' = 'people',
>    'username'='root',
>    'password'='*******',
>    'lookup.cache.ttl'='10s',
>    'lookup.cache.max-rows'='20'
> );
> 
> mysql ddl:
> CREATE TABLE `people` (
>   `Id` int(11) NOT NULL AUTO_INCREMENT,
>   `Name` varchar(40) NOT NULL,
>   `Sex` tinyint(3) unsigned NOT NULL,
>   `Birth` datetime DEFAULT NULL,
>   `Etltype` varchar(40) NOT NULL,
>   PRIMARY KEY (`Id`)
> ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
> 
> 
> --流维join
> INSERT INTO flink_doris_sink select 
> a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype
>  from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b 
> on a.tenantcode = b.Name;
> 
> 
> 
> 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
> flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22
> 
> 
> 
>  

回复