这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No
 Data,sink显示的是No Watermark
我的SQL语句如下:
CREATE TABLE t_stock_match_p_1(
  id VARCHAR, 
  stkcode INT,
  volume INT,
  matchtime BIGINT,
  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
 ) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_zyh',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_match_p_zyh',
  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
  'properties.security.protocol' = 'SASL_SDP',
  'properties.sasl.mechanism' = 'SDP',
  'properties.key.deserializer' = 
'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 
'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);

CREATE TABLE t_stock_match_1 (
  stkcode int,
  pd TIMESTAMP,
  volume  INT 
) WITH (
 'connector' = 'print'
);

INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) 
as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' 
MINUTE),stkcode; 

然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
1503,600001,15811,1614405166858
1504,600001,15813,1614405333871
1505,600001,15814,1614405544862
1506,600001,15814,1614405673863





 
> 在 2021年2月26日,15:02,Smile <letters_sm...@163.com> 写道:
> 
> 你好,
> 
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
> 
> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
> fields=[stkcode, pd, volume])
> 
> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
> 
> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
> 页面指标一直是 0 的。
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> 
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> 
> 
> 祝好~
> Smile
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

回复