Re: 如何监控kafka延迟
sorry, metrics 项没复制全,应该是taskmanager_job_task_operator_KafkaConsumer_records-lag-max。 我们主要是通过 grafana 的图标来展现来监控延迟等信息,简单的报警页可以通过grafana来配置。细粒度到任务级别的报警,grafana配置起来有点繁琐,不过可能可以通过grafana 的 rest api 自动生成。 jie mei 于2021年7月28日周三 下午5:58写道: > hi,all > > 我们是通过 grafana 对采集到的 flink kafka 的 > metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。 > > xuhaiLong 于2021年7月28日周三 下午5:46写道: > >> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 >> >> >> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: >> Hi comsir >> >> kafka的控制台能力比较弱,想知道延迟只能自己维护。 >> >> 维护方式: >> >> 1. 每个服务的topic的offset 减去 groupid的offset >> >> 2. 尽量可以计算出各种消费速度 >> >> 3. rocketmq控制台,可看到消费进度,可以参照下。 >> >> >> 在 2021/7/28 上午11:02, 龙逸尘 写道: >> Hi comsir, >> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 >> group id 需要自己维护。 >> >> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: >> >> hi all >> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? >> 监控这个延迟的目的:1.大盘展示,2.延迟后报警 >> 小问题: >> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? >> 2.怎么获取groupId呢,多个group消费的话,如何区分呀? >> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? >> 4.有比较优雅的实现方式吗? >> 非常感谢 期待解答 感谢感谢 >> > > > -- > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
Re: 如何监控kafka延迟
hi,all 我们是通过 grafana 对采集到的 flink kafka 的 metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。 xuhaiLong 于2021年7月28日周三 下午5:46写道: > 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 > > > 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: > Hi comsir > > kafka的控制台能力比较弱,想知道延迟只能自己维护。 > > 维护方式: > > 1. 每个服务的topic的offset 减去 groupid的offset > > 2. 尽量可以计算出各种消费速度 > > 3. rocketmq控制台,可看到消费进度,可以参照下。 > > > 在 2021/7/28 上午11:02, 龙逸尘 写道: > Hi comsir, > 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 > group id 需要自己维护。 > > comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: > > hi all > 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? > 监控这个延迟的目的:1.大盘展示,2.延迟后报警 > 小问题: > 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? > 2.怎么获取groupId呢,多个group消费的话,如何区分呀? > 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? > 4.有比较优雅的实现方式吗? > 非常感谢 期待解答 感谢感谢 > -- *Best Regards* *Jeremy Mei*
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B 的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是 Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。 jie mei 于2021年4月12日周一 上午11:24写道: > 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。 > > jie mei 于2021年4月12日周一 上午1:49写道: > >> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, >> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput >> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 >> >> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL >> >> create table input_table ( >> `dim` varchar, >> `server_time` bigint, >> `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, >> '-MM-dd HH:mm:ss')), >> WATERMARK FOR `event_time` AS `event_time` >> ) >> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, >> `dim`, >> count(1), >> FROM input_table >> GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` >> >> >> >> *Best Regards* >> *Jeremy Mei* >> > > > -- > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。 jie mei 于2021年4月12日周一 上午1:49写道: > 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, > 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的 > processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 > > 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL > > create table input_table ( > `dim` varchar, > `server_time` bigint, > `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, > '-MM-dd HH:mm:ss')), > WATERMARK FOR `event_time` AS `event_time` > ) > select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, > `dim`, > count(1), > FROM input_table > GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` > > > > *Best Regards* > *Jeremy Mei* > -- *Best Regards* *Jeremy Mei*
分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL create table input_table ( `dim` varchar, `server_time` bigint, `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, '-MM-dd HH:mm:ss')), WATERMARK FOR `event_time` AS `event_time` ) select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, `dim`, count(1), FROM input_table GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` *Best Regards* *Jeremy Mei*
[讨论] Flink Connector 并行写入数据方案
Hi, Community 我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。 该方案应该支持三种场景: 1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。 例如 ClickHouse 2) 相同主键或分区内有序: 支持 Upsert,但不支持事务或者跨行事务的存储,例如 ElasticSearch, MongoDB 3) 事务:支持跨行事务的存储,例如 MySQL。 另外说一下,第二种情况和第三种情况的一个重要区别是,当 CheckPoint 失败,第二种情况会从上一个快照重新执行, 那么会存在旧的数据可能覆盖新的数据的情况。举个例子: 假设正常情况下记录A在某个快照区间取值为 A1, A2, A3。假如在写入 A2 后快照失败,当重新执行的时候,会短暂的存在这种情况,A1 覆盖了 A2 的值。 下面是不同场景扩大并行度的方案 1) 至少一次: 在这种场景下,数据乱顺是可容忍的,只要保证最少一次,就能达到最终一致性。可以考虑多线程异步写入数据, 当异步任务过多,则等待有异步任务完成,再执行新的异步写入任务。CheckPoint需要保证所有异步任务完成 2) 相同主键或分区内有序,最少一次: 在这种场景下,如果指定了分区字段,可以将相同分区的数据放到一个 Buffer 里,相同 Buffer 的数据有序, 不同 Buffer的数据并行写入,CheckPoint的时候需要保证所有数据写入;如果没有分区,单指定了主键,可以 根据主键的 Hash Code 对 Sink 并行读取模,得到的值用于决定数据缓存到哪一个 Buffer,同样相同的 Buffer 内有序,不同的 Buffer 并行。 3) 事务: 由于已经有了通用的 Sink API,可以考虑把数据缓存到 Buffer, 在 CheckPoint 的时候,开启事务,完成写入数据,并提交。 [FLIP-143] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API 分表: 对于 MySQL, MongoDB 这类存储,可以通过分区键来定义分表规则,假如表 A 定义了分区键 B,B 有 B1, B2 两个取值, 那么得到两个分表 A_B1, A_B2. -- *Best Regards* *Jeremy Mei*
Re: flink sql jmh failure
HI, Guowei yeah, I think so too. There is no way trigger a checkpoint and wath the checkpoint finished now, so I will do the benchmark with lower level api. Guowei Ma 于2021年3月25日周四 下午4:59写道: > Hi, > I am not an expert of JMH but it seems that it is not an error. From the > log it looks like that the job is not finished. > The data source continues to read data when JMH finishes. > > Thread[Legacy Source Thread - Source: > TableSourceScan(table=[[default_catalog, default_database, > CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, second_string]) -> > Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, second_string]) -> Sink: > Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], > fields=[dt, first_bigint, second_bigint, first_int, second_int, > first_float, second_float, first_double, second_double, first_string, > second_string]) (3/6),5,Flink Task Threads] > at > org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82) > at org.apache.flink.table.data.StringData.fromString(StringData.java:52) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277) > at > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > > Best, > Guowei > > > On Wed, Mar 24, 2021 at 9:56 PM jie mei wrote: > >> Hi, Yik San >> >> I use a library wroten by myself and trying to verify the performance. >> >> >> Yik San Chan 于2021年3月24日周三 下午9:07写道: >> >>> Hi Jie, >>> >>> I am curious what library do you use to get the ClickHouseTableBuilder >>> >>> On Wed, Mar 24, 2021 at 8:41 PM jie mei wrote: >>> >>>> Hi, Community >>>> >>>> I run a jmh benchmark task get blew error, which use flink sql >>>> consuming data from data-gen connector(10_000_000) and write data to >>>> clickhouse. blew is partly log and you can see completable log by attached >>>> file >>>> >>>> *My jmh benchmark code as blew:* >>>> >>>> @Benchmark >>>> @Threads(1) >>>> @Fork(1) >>>> public void sinkBenchmark() throws IOException { >>>> >>>> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment >>>> .getExecutionEnvironment(); >>>> streamEnv.enableCheckpointing(6); >>>> >>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode().build(); >>>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >>>> settings); >>>> >>>> // create clickhouse table >>>> new ClickHouseTableBuilder(tableEnv, >>>> parseSchema("clickhouse_sink_table.sql")) >>>> .database("benchmark") >>>> .table("bilophus_sink_benchmark") >>>> .address("jdbc:clickhouse://localhost:8123") >>>> .build(); >>>> >>>> // create mock data table >>>> tableEnv.executeSql( >>>> parseSchema("clickhouse_source_table.sql") + >>>> "WITH (" + >>>> "'connector' = 'datagen'," + >>>> "'number-of-rows' = '1000')"); >>>> >>>> tableEnv.executeSql( >>>> "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM >>>> CLICKHOUSE_SOURCE_BENCHMARK"); >>>> >>>> } >>>> >>>> *running command:* >>>> >>>
Re: flink sql jmh failure
Hi, Yik San I use a library wroten by myself and trying to verify the performance. Yik San Chan 于2021年3月24日周三 下午9:07写道: > Hi Jie, > > I am curious what library do you use to get the ClickHouseTableBuilder > > On Wed, Mar 24, 2021 at 8:41 PM jie mei wrote: > >> Hi, Community >> >> I run a jmh benchmark task get blew error, which use flink sql consuming >> data from data-gen connector(10_000_000) and write data to clickhouse. blew >> is partly log and you can see completable log by attached file >> >> *My jmh benchmark code as blew:* >> >> @Benchmark >> @Threads(1) >> @Fork(1) >> public void sinkBenchmark() throws IOException { >> >> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment >> .getExecutionEnvironment(); >> streamEnv.enableCheckpointing(6); >> >> EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode().build(); >> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >> settings); >> >> // create clickhouse table >> new ClickHouseTableBuilder(tableEnv, >> parseSchema("clickhouse_sink_table.sql")) >> .database("benchmark") >> .table("bilophus_sink_benchmark") >> .address("jdbc:clickhouse://localhost:8123") >> .build(); >> >> // create mock data table >> tableEnv.executeSql( >> parseSchema("clickhouse_source_table.sql") + >> "WITH (" + >> "'connector' = 'datagen'," + >> "'number-of-rows' = '1000')"); >> >> tableEnv.executeSql( >> "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM >> CLICKHOUSE_SOURCE_BENCHMARK"); >> >> } >> >> *running command:* >> >> mvn clean package -DskipTests >> >> >> org.codehaus.mojo >> exec-maven-plugin >> 1.6.0 >> >> >> test-benchmarks >> test >> >> exec >> >> >> >> >> false >> test >> java >> >> -Xmx6g >> -classpath >> >> org.openjdk.jmh.Main >> >> -foe >> true >> >> -f >> 1 >> -i >> 1 >> -wi >> 0 >> -rf >> csv >> .* >> >> >> >> >> >> Non-finished threads: >> >> Thread[Source: TableSourceScan(table=[[default_catalog, default_database, >> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, >> first_int, second_int, first_float, second_float, first_double, >> second_double, first_string, s >> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >> first_bigint, second_bigint, first_int, second_int, first_float, >> second_float, first_double, second_double, first_string, second_string]) -> >> Sink: Sink(table=[default_catal >> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, >> second_bigint, first_int, second_int, first_float, second_float, >> first_double, second_double, first_string, second_string]) (1/6),5,Flink >> Task Threads] >> at sun.misc.Unsafe.park(Native Method) >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> at java.lang.Thread.run(Thread.java:748) >> >> Thread[flink-akka.actor.default-dispatcher-8,5,main] >> at sun.misc.Unsafe.park(Native Method) >> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:20
Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi, Leonard 好的,我将会提一个PR来修复这个issue Leonard Xu 于2020年12月10日周四 下午12:10写道: > 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, > 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为 > OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, > 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 > > 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? > > 祝好, > Leonard > > > > 在 2020年12月10日,11:22,jie mei 写道: > > Hi,Jark > > 好的,我会就此创建一个issue > > Jark Wu 于2020年12月10日周四 上午11:17写道: > >> Hi Jie, >> >> 看起来确实是个问题。 >> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 >> 可以帮忙创建个 issue 么? >> >> Best, >> Jark >> >> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: >> >> > Hi, >> >是的,感觉你是对的。 >> > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 >> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 >> snapshotState >> > 时候调用format.flush。 >> >WDYT @Jark @ Leonard >> > >> > Best, >> > Hailong >> > >> > >> > 在 2020-12-09 17:13:14,"jie mei" 写道: >> > >Hi, Community >> > > >> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 >> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 >> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 >> > > >> > >我的问题是:是否有办法强制刷新buffer中的数据入库? >> > > >> > > >> > >@Public >> > >public interface OutputFormat extends Serializable { >> > > >> > > /** >> > >* Configures this output format. Since output formats are >> > >instantiated generically and hence parameterless, >> > >* this method is the place where the output formats set their >> > >basic fields based on configuration values. >> > >* >> > >* This method is always called first on a newly instantiated >> output format. >> > >* >> > >* @param parameters The configuration with all parameters. >> > >*/ >> > > void configure(Configuration parameters); >> > > >> > > /** >> > >* Opens a parallel instance of the output format to store the >> > >result of its parallel instance. >> > >* >> > >* When this method is called, the output format it guaranteed to >> > >be configured. >> > >* >> > >* @param taskNumber The number of the parallel instance. >> > >* @param numTasks The number of parallel tasks. >> > >* @throws IOException Thrown, if the output could not be opened >> > >due to an I/O problem. >> > >*/ >> > > void open(int taskNumber, int numTasks) throws IOException; >> > > >> > > >> > > /** >> > >* Adds a record to the output. >> > >* >> > >* When this method is called, the output format it guaranteed to >> be opened. >> > >* >> > >* @param record The records to add to the output. >> > >* @throws IOException Thrown, if the records could not be added to >> > >to an I/O problem. >> > >*/ >> > > void writeRecord(IT record) throws IOException; >> > > >> > > /** >> > >* Method that marks the end of the life-cycle of parallel output >> > >instance. Should be used to close >> > >* channels and streams and release resources. >> > >* After this method returns without an error, the output is >> > >assumed to be correct. >> > >* >> > >* When this method is called, the output format it guaranteed to >> be opened. >> > >* >> > >* @throws IOException Thrown, if the input could not be closed >> properly. >> > >*/ >> > > void close() throws IOException; >> > >} >> > > >> > > >> > >-- >> > > >> > >*Best Regards* >> > >*Jeremy Mei* >> > >> > >> > >> > >> > >> > >> > > > -- > > *Best Regards* > *Jeremy Mei* > > > -- *Best Regards* *Jeremy Mei*
Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi,Jark 好的,我会就此创建一个issue Jark Wu 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > > > Hi, > >是的,感觉你是对的。 > > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 > > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 > snapshotState > > 时候调用format.flush。 > >WDYT @Jark @ Leonard > > > > Best, > > Hailong > > > > > > 在 2020-12-09 17:13:14,"jie mei" 写道: > > >Hi, Community > > > > > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 > > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 > > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 > > > > > >我的问题是:是否有办法强制刷新buffer中的数据入库? > > > > > > > > >@Public > > >public interface OutputFormat extends Serializable { > > > > > > /** > > >* Configures this output format. Since output formats are > > >instantiated generically and hence parameterless, > > >* this method is the place where the output formats set their > > >basic fields based on configuration values. > > >* > > >* This method is always called first on a newly instantiated output > format. > > >* > > >* @param parameters The configuration with all parameters. > > >*/ > > > void configure(Configuration parameters); > > > > > > /** > > >* Opens a parallel instance of the output format to store the > > >result of its parallel instance. > > >* > > >* When this method is called, the output format it guaranteed to > > >be configured. > > >* > > >* @param taskNumber The number of the parallel instance. > > >* @param numTasks The number of parallel tasks. > > >* @throws IOException Thrown, if the output could not be opened > > >due to an I/O problem. > > >*/ > > > void open(int taskNumber, int numTasks) throws IOException; > > > > > > > > > /** > > >* Adds a record to the output. > > >* > > >* When this method is called, the output format it guaranteed to be > opened. > > >* > > >* @param record The records to add to the output. > > >* @throws IOException Thrown, if the records could not be added to > > >to an I/O problem. > > >*/ > > > void writeRecord(IT record) throws IOException; > > > > > > /** > > >* Method that marks the end of the life-cycle of parallel output > > >instance. Should be used to close > > >* channels and streams and release resources. > > >* After this method returns without an error, the output is > > >assumed to be correct. > > >* > > >* When this method is called, the output format it guaranteed to be > opened. > > >* > > >* @throws IOException Thrown, if the input could not be closed > properly. > > >*/ > > > void close() throws IOException; > > >} > > > > > > > > >-- > > > > > >*Best Regards* > > >*Jeremy Mei* > > > > > > > > > > > > > -- *Best Regards* *Jeremy Mei*
flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库
Hi, Community JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 我的问题是:是否有办法强制刷新buffer中的数据入库? @Public public interface OutputFormat extends Serializable { /** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. * * This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters); /** * Opens a parallel instance of the output format to store the result of its parallel instance. * * When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException; /** * Adds a record to the output. * * When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException; /** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. * * When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; } -- *Best Regards* *Jeremy Mei*
动态表 Change Log 格式
Hi, Community Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗? 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。 此外,Delete语句对应的数据是完整记录还是操作日志呢? 这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。 并通过额外的逻辑判断来获得最新的数据是可行的。 -- *Best Regards* *Jeremy Mei*