Hi! 一般来说 window 流计算结果和批计算结果不一致,很可能是因为有迟到数据。请检查一下是否有迟到数据,如果有可以考虑把 watermark 的时间再加长一点。
李占阳 <15333936...@126.com> 于2021年8月13日周五 下午5:21写道: > Hi all: > 我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql: > > String message = " CREATE TABLE test(\n" + > " gid VARCHAR COMMENT 'uuid 唯一标识',\n" + > " ip VARCHAR COMMENT 'ip 地址',\n" + > " business_no VARCHAR COMMENT '商户号',\n" + > " rtime BIGINT ,\n" + > " event_time as TO_TIMESTAMP_LTZ(rtime,3) > ,\n" + > " WATERMARK FOR event_time AS event_time - > INTERVAL '2' MINUTE , \n"+ > " ts AS PROCTIME () , \n"+ > " `record_time` TIMESTAMP_LTZ(3) METADATA FROM > 'timestamp' \n"+ > " ) \n" + > " WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'test',\n" + > " 'properties.group.id' = 'consumer-02',\n" + > " 'properties.bootstrap.servers' = > 'XXX:9092',\n" + > " 'properties.security.protocol' = > 'SASL_PLAINTEXT',\n" + > " 'properties.sasl.mechanism' = 'GSSAPI',\n" + > " 'properties.sasl.kerberos.service.name' = > 'kafka',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" > + > " 'format' = 'json'\n" + > " )"; > // " > > > > String message_cnts="SELECT " + > "ip ," + > "business_no as business_no ," + > " min(record_time) as record_time," + > " count(distinct gid) as total_call_num \n" + > ",window_start, window_end" + > " FROM TABLE(\n" + > " TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' > MINUTES))\n" + > " GROUP BY window_start, window_end, GROUPING SETS ((business_no > ,ip)) "; > > > > > > > >