Re: 如何监控kafka延迟
sorry, metrics 项没复制全,应该是taskmanager_job_task_operator_KafkaConsumer_records-lag-max。 我们主要是通过 grafana 的图标来展现来监控延迟等信息,简单的报警页可以通过grafana来配置。细粒度到任务级别的报警,grafana配置起来有点繁琐,不过可能可以通过grafana 的 rest api 自动生成。 jie mei 于2021年7月28日周三 下午5:58写道: > hi,all > > 我们是通过 grafana 对采集到的 flink kafka 的 > metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。 > > xuhaiLong 于2021年7月28日周三 下午5:46写道: > >> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 >> >> >> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: >> Hi comsir >> >> kafka的控制台能力比较弱,想知道延迟只能自己维护。 >> >> 维护方式: >> >> 1. 每个服务的topic的offset 减去 groupid的offset >> >> 2. 尽量可以计算出各种消费速度 >> >> 3. rocketmq控制台,可看到消费进度,可以参照下。 >> >> >> 在 2021/7/28 上午11:02, 龙逸尘 写道: >> Hi comsir, >> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 >> group id 需要自己维护。 >> >> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: >> >> hi all >> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? >> 监控这个延迟的目的:1.大盘展示,2.延迟后报警 >> 小问题: >> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? >> 2.怎么获取groupId呢,多个group消费的话,如何区分呀? >> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? >> 4.有比较优雅的实现方式吗? >> 非常感谢 期待解答 感谢感谢 >> > > > -- > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
Re: 如何监控kafka延迟
hi,all 我们是通过 grafana 对采集到的 flink kafka 的 metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。 xuhaiLong 于2021年7月28日周三 下午5:46写道: > 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 > > > 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: > Hi comsir > > kafka的控制台能力比较弱,想知道延迟只能自己维护。 > > 维护方式: > > 1. 每个服务的topic的offset 减去 groupid的offset > > 2. 尽量可以计算出各种消费速度 > > 3. rocketmq控制台,可看到消费进度,可以参照下。 > > > 在 2021/7/28 上午11:02, 龙逸尘 写道: > Hi comsir, > 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 > group id 需要自己维护。 > > comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: > > hi all > 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? > 监控这个延迟的目的:1.大盘展示,2.延迟后报警 > 小问题: > 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? > 2.怎么获取groupId呢,多个group消费的话,如何区分呀? > 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? > 4.有比较优雅的实现方式吗? > 非常感谢 期待解答 感谢感谢 > -- *Best Regards* *Jeremy Mei*
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B 的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是 Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。 jie mei 于2021年4月12日周一 上午11:24写道: > 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。 > > jie mei 于2021年4月12日周一 上午1:49写道: > >> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, >> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput >> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 >> >> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL >> >> create table input_table ( >> `dim` varchar, >> `server_time` bigint, >> `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, >> '-MM-dd HH:mm:ss')), >> WATERMARK FOR `event_time` AS `event_time` >> ) >> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, >> `dim`, >> count(1), >> FROM input_table >> GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` >> >> >> >> *Best Regards* >> *Jeremy Mei* >> > > > -- > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。 jie mei 于2021年4月12日周一 上午1:49写道: > 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, > 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的 > processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 > > 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL > > create table input_table ( > `dim` varchar, > `server_time` bigint, > `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, > '-MM-dd HH:mm:ss')), > WATERMARK FOR `event_time` AS `event_time` > ) > select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, > `dim`, > count(1), > FROM input_table > GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` > > > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL create table input_table ( `dim` varchar, `server_time` bigint, `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, '-MM-dd HH:mm:ss')), WATERMARK FOR `event_time` AS `event_time` ) select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, `dim`, count(1), FROM input_table GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` *Best Regards* *Jeremy Mei*
[讨论] Flink Connector 并行写入数据方案
Hi, Community 我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。 该方案应该支持三种场景: 1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。 例如 ClickHouse 2) 相同主键或分区内有序: 支持 Upsert,但不支持事务或者跨行事务的存储,例如 ElasticSearch, MongoDB 3) 事务:支持跨行事务的存储,例如 MySQL。 另外说一下,第二种情况和第三种情况的一个重要区别是,当 CheckPoint 失败,第二种情况会从上一个快照重新执行, 那么会存在旧的数据可能覆盖新的数据的情况。举个例子: 假设正常情况下记录A在某个快照区间取值为 A1, A2, A3。假如在写入 A2 后快照失败,当重新执行的时候,会短暂的存在这种情况,A1 覆盖了 A2 的值。 下面是不同场景扩大并行度的方案 1) 至少一次: 在这种场景下,数据乱顺是可容忍的,只要保证最少一次,就能达到最终一致性。可以考虑多线程异步写入数据, 当异步任务过多,则等待有异步任务完成,再执行新的异步写入任务。CheckPoint需要保证所有异步任务完成 2) 相同主键或分区内有序,最少一次: 在这种场景下,如果指定了分区字段,可以将相同分区的数据放到一个 Buffer 里,相同 Buffer 的数据有序, 不同 Buffer的数据并行写入,CheckPoint的时候需要保证所有数据写入;如果没有分区,单指定了主键,可以 根据主键的 Hash Code 对 Sink 并行读取模,得到的值用于决定数据缓存到哪一个 Buffer,同样相同的 Buffer 内有序,不同的 Buffer 并行。 3) 事务: 由于已经有了通用的 Sink API,可以考虑把数据缓存到 Buffer, 在 CheckPoint 的时候,开启事务,完成写入数据,并提交。 [FLIP-143] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API 分表: 对于 MySQL, MongoDB 这类存储,可以通过分区键来定义分表规则,假如表 A 定义了分区键 B,B 有 B1, B2 两个取值, 那么得到两个分表 A_B1, A_B2. -- *Best Regards* *Jeremy Mei*
Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi, Leonard 好的,我将会提一个PR来修复这个issue Leonard Xu 于2020年12月10日周四 下午12:10写道: > 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, > 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为 > OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, > 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 > > 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? > > 祝好, > Leonard > > > > 在 2020年12月10日,11:22,jie mei 写道: > > Hi,Jark > > 好的,我会就此创建一个issue > > Jark Wu 于2020年12月10日周四 上午11:17写道: > >> Hi Jie, >> >> 看起来确实是个问题。 >> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 >> 可以帮忙创建个 issue 么? >> >> Best, >> Jark >> >> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: >> >> > Hi, >> >是的,感觉你是对的。 >> > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 >> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 >> snapshotState >> > 时候调用format.flush。 >> >WDYT @Jark @ Leonard >> > >> > Best, >> > Hailong >> > >> > >> > 在 2020-12-09 17:13:14,"jie mei" 写道: >> > >Hi, Community >> > > >> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 >> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 >> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 >> > > >> > >我的问题是:是否有办法强制刷新buffer中的数据入库? >> > > >> > > >> > >@Public >> > >public interface OutputFormat extends Serializable { >> > > >> > > /** >> > >* Configures this output format. Since output formats are >> > >instantiated generically and hence parameterless, >> > >* this method is the place where the output formats set their >> > >basic fields based on configuration values. >> > >* >> > >* This method is always called first on a newly instantiated >> output format. >> > >* >> > >* @param parameters The configuration with all parameters. >> > >*/ >> > > void configure(Configuration parameters); >> > > >> > > /** >> > >* Opens a parallel instance of the output format to store the >> > >result of its parallel instance. >> > >* >> > >* When this method is called, the output format it guaranteed to >> > >be configured. >> > >* >> > >* @param taskNumber The number of the parallel instance. >> > >* @param numTasks The number of parallel tasks. >> > >* @throws IOException Thrown, if the output could not be opened >> > >due to an I/O problem. >> > >*/ >> > > void open(int taskNumber, int numTasks) throws IOException; >> > > >> > > >> > > /** >> > >* Adds a record to the output. >> > >* >> > >* When this method is called, the output format it guaranteed to >> be opened. >> > >* >> > >* @param record The records to add to the output. >> > >* @throws IOException Thrown, if the records could not be added to >> > >to an I/O problem. >> > >*/ >> > > void writeRecord(IT record) throws IOException; >> > > >> > > /** >> > >* Method that marks the end of the life-cycle of parallel output >> > >instance. Should be used to close >> > >* channels and streams and release resources. >> > >* After this method returns without an error, the output is >> > >assumed to be correct. >> > >* >> > >* When this method is called, the output format it guaranteed to >> be opened. >> > >* >> > >* @throws IOException Thrown, if the input could not be closed >> properly. >> > >*/ >> > > void close() throws IOException; >> > >} >> > > >> > > >> > >-- >> > > >> > >*Best Regards* >> > >*Jeremy Mei* >> > >> > >> > >> > >> > >> > >> > > > -- > > *Best Regards* > *Jeremy Mei* > > > -- *Best Regards* *Jeremy Mei*
Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi,Jark 好的,我会就此创建一个issue Jark Wu 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > > > Hi, > >是的,感觉你是对的。 > > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 > > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 > snapshotState > > 时候调用format.flush。 > >WDYT @Jark @ Leonard > > > > Best, > > Hailong > > > > > > 在 2020-12-09 17:13:14,"jie mei" 写道: > > >Hi, Community > > > > > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 > > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 > > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > > > > > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > > > > > > > >@Public > > >public interface OutputFormat extends Serializable { > > > > > > /** > > >* Configures this output format. Since output formats are > > >instantiated generically and hence parameterless, > > >* this method is the place where the output formats set their > > >basic fields based on configuration values. > > >* > > >* This method is always called first on a newly instantiated output > format. > > >* > > >* @param parameters The configuration with all parameters. > > >*/ > > > void configure(Configuration parameters); > > > > > > /** > > >* Opens a parallel instance of the output format to store the > > >result of its parallel instance. > > >* > > >* When this method is called, the output format it guaranteed to > > >be configured. > > >* > > >* @param taskNumber The number of the parallel instance. > > >* @param numTasks The number of parallel tasks. > > >* @throws IOException Thrown, if the output could not be opened > > >due to an I/O problem. > > >*/ > > > void open(int taskNumber, int numTasks) throws IOException; > > > > > > > > > /** > > >* Adds a record to the output. > > >* > > >* When this method is called, the output format it guaranteed to be > opened. > > >* > > >* @param record The records to add to the output. > > >* @throws IOException Thrown, if the records could not be added to > > >to an I/O problem. > > >*/ > > > void writeRecord(IT record) throws IOException; > > > > > > /** > > >* Method that marks the end of the life-cycle of parallel output > > >instance. Should be used to close > > >* channels and streams and release resources. > > >* After this method returns without an error, the output is > > >assumed to be correct. > > >* > > >* When this method is called, the output format it guaranteed to be > opened. > > >* > > >* @throws IOException Thrown, if the input could not be closed > properly. > > >*/ > > > void close() throws IOException; > > >} > > > > > > > > >-- > > > > > >*Best Regards* > > >*Jeremy Mei* > > > > > > > > > > > > > -- *Best Regards* *Jeremy Mei*
flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi, Community JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 我的问题是:是否有办法强制刷新buffer中的数据入库? @Public public interface OutputFormat extends Serializable { /** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. * * This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters); /** * Opens a parallel instance of the output format to store the result of its parallel instance. * * When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException; /** * Adds a record to the output. * * When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException; /** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. * * When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; } -- *Best Regards* *Jeremy Mei*
动态表 Change Log 格式
Hi, Community Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗? 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。 此外,Delete语句对应的数据是完整记录还是操作日志呢? 这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。 并通过额外的逻辑判断来获得最新的数据是可行的。 -- *Best Regards* *Jeremy Mei*