Re: 如何监控kafka延迟

2021-07-28 Thread jie mei
sorry, metrics
项没复制全,应该是taskmanager_job_task_operator_KafkaConsumer_records-lag-max。

我们主要是通过 grafana
的图标来展现来监控延迟等信息,简单的报警页可以通过grafana来配置。细粒度到任务级别的报警,grafana配置起来有点繁琐,不过可能可以通过grafana
的 rest api 自动生成。

jie mei  于2021年7月28日周三 下午5:58写道:

> hi,all
>
> 我们是通过 grafana 对采集到的 flink kafka 的
> metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。
>
> xuhaiLong  于2021年7月28日周三 下午5:46写道:
>
>> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>>
>>
>> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
>> Hi comsir
>>
>> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>>
>> 维护方式:
>>
>> 1. 每个服务的topic的offset 减去 groupid的offset
>>
>> 2. 尽量可以计算出各种消费速度
>>
>> 3. rocketmq控制台,可看到消费进度,可以参照下。
>>
>>
>> 在 2021/7/28 上午11:02, 龙逸尘 写道:
>> Hi comsir,
>> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
>> group  id 需要自己维护。
>>
>> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>>
>> hi all
>> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
>> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
>> 小问题:
>> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
>> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
>> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
>> 4.有比较优雅的实现方式吗?
>> 非常感谢 期待解答 感谢感谢
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


-- 

*Best Regards*
*Jeremy Mei*


Re: 如何监控kafka延迟

2021-07-28 Thread jie mei
hi,all

我们是通过 grafana 对采集到的 flink kafka 的
metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。

xuhaiLong  于2021年7月28日周三 下午5:46写道:

> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>
>
> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
> Hi comsir
>
> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>
> 维护方式:
>
> 1. 每个服务的topic的offset 减去 groupid的offset
>
> 2. 尽量可以计算出各种消费速度
>
> 3. rocketmq控制台,可看到消费进度,可以参照下。
>
>
> 在 2021/7/28 上午11:02, 龙逸尘 写道:
> Hi comsir,
> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
> group  id 需要自己维护。
>
> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>
> hi all
> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
> 小问题:
> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
> 4.有比较优雅的实现方式吗?
> 非常感谢 期待解答 感谢感谢
>


-- 

*Best Regards*
*Jeremy Mei*


Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-12 Thread jie mei
此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B
的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是
Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。

jie mei  于2021年4月12日周一 上午11:24写道:

> 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。
>
> jie mei  于2021年4月12日周一 上午1:49写道:
>
>> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator,  下,
>> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput
>> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。
>>
>> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL
>>
>> create table input_table (
>> `dim` varchar,
>>  `server_time` bigint,
>>  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000,
>> '-MM-dd HH:mm:ss')),
>>  WATERMARK FOR `event_time` AS `event_time`
>> )
>> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
>> `dim`,
>> count(1),
>> FROM input_table
>>  GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`
>>
>>
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


-- 

*Best Regards*
*Jeremy Mei*


Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-11 Thread jie mei
问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。

jie mei  于2021年4月12日周一 上午1:49写道:

> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator,  下,
> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的
> processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。
>
> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL
>
> create table input_table (
> `dim` varchar,
>  `server_time` bigint,
>  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000,
> '-MM-dd HH:mm:ss')),
>  WATERMARK FOR `event_time` AS `event_time`
> )
> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
> `dim`,
> count(1),
> FROM input_table
>  GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`
>
>
>
> *Best Regards*
> *Jeremy Mei*
>


-- 

*Best Regards*
*Jeremy Mei*


分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-11 Thread jie mei
大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator,  下,
发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput 中的
processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。

我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL

create table input_table (
`dim` varchar,
 `server_time` bigint,
 `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, '-MM-dd
HH:mm:ss')),
 WATERMARK FOR `event_time` AS `event_time`
)
select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
`dim`,
count(1),
FROM input_table
 GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`



*Best Regards*
*Jeremy Mei*


[讨论] Flink Connector 并行写入数据方案

2021-03-31 Thread jie mei
Hi, Community

我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。

该方案应该支持三种场景:

1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。 例如 ClickHouse
2) 相同主键或分区内有序: 支持 Upsert,但不支持事务或者跨行事务的存储,例如 ElasticSearch, MongoDB
3) 事务:支持跨行事务的存储,例如 MySQL。

另外说一下,第二种情况和第三种情况的一个重要区别是,当 CheckPoint 失败,第二种情况会从上一个快照重新执行,
那么会存在旧的数据可能覆盖新的数据的情况。举个例子: 假设正常情况下记录A在某个快照区间取值为
A1, A2, A3。假如在写入 A2 后快照失败,当重新执行的时候,会短暂的存在这种情况,A1 覆盖了 A2 的值。

下面是不同场景扩大并行度的方案
1) 至少一次:
在这种场景下,数据乱顺是可容忍的,只要保证最少一次,就能达到最终一致性。可以考虑多线程异步写入数据,
当异步任务过多,则等待有异步任务完成,再执行新的异步写入任务。CheckPoint需要保证所有异步任务完成

2) 相同主键或分区内有序,最少一次:
在这种场景下,如果指定了分区字段,可以将相同分区的数据放到一个 Buffer 里,相同 Buffer 的数据有序,
不同 Buffer的数据并行写入,CheckPoint的时候需要保证所有数据写入;如果没有分区,单指定了主键,可以
根据主键的 Hash Code 对 Sink 并行读取模,得到的值用于决定数据缓存到哪一个 Buffer,同样相同的 Buffer
内有序,不同的 Buffer 并行。

3) 事务:
由于已经有了通用的 Sink API,可以考虑把数据缓存到 Buffer, 在 CheckPoint 的时候,开启事务,完成写入数据,并提交。
[FLIP-143]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

分表:
对于 MySQL, MongoDB 这类存储,可以通过分区键来定义分表规则,假如表 A 定义了分区键 B,B 有 B1, B2 两个取值,
那么得到两个分表 A_B1, A_B2.


-- 

*Best Regards*
*Jeremy Mei*


Re: flink sql jmh failure

2021-03-25 Thread jie mei
HI, Guowei

yeah, I think so too. There is no way trigger a checkpoint and wath the
checkpoint finished now, so I will do the benchmark with lower level api.


Guowei Ma  于2021年3月25日周四 下午4:59写道:

> Hi,
> I am not an expert of JMH but it seems that it is not an error. From the
> log it looks like that the job is not finished.
> The data source continues to read data when JMH finishes.
>
> Thread[Legacy Source Thread - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) ->
> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) -> Sink:
> Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
> fields=[dt, first_bigint, second_bigint, first_int, second_int,
> first_float, second_float, first_double, second_double, first_string,
> second_string]) (3/6),5,Flink Task Threads]
>   at
> org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
>   at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
>   at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 9:56 PM jie mei  wrote:
>
>> Hi, Yik San
>>
>> I use a library wroten by myself and trying to verify the performance.
>>
>>
>> Yik San Chan  于2021年3月24日周三 下午9:07写道:
>>
>>> Hi Jie,
>>>
>>> I am curious what library do you use to get the ClickHouseTableBuilder
>>>
>>> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>>>
>>>> Hi, Community
>>>>
>>>> I run a jmh benchmark task get blew error, which use flink sql
>>>> consuming data from data-gen connector(10_000_000) and write data to
>>>> clickhouse. blew is partly log and you can see completable log by attached
>>>> file
>>>>
>>>> *My jmh benchmark code as blew:*
>>>>
>>>> @Benchmark
>>>> @Threads(1)
>>>> @Fork(1)
>>>> public void sinkBenchmark() throws IOException {
>>>>
>>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>>   .getExecutionEnvironment();
>>>>   streamEnv.enableCheckpointing(6);
>>>>
>>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>   .useBlinkPlanner()
>>>>   .inStreamingMode().build();
>>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>>>> settings);
>>>>
>>>>   // create clickhouse table
>>>>   new ClickHouseTableBuilder(tableEnv,
>>>>   parseSchema("clickhouse_sink_table.sql"))
>>>>   .database("benchmark")
>>>>   .table("bilophus_sink_benchmark")
>>>>   .address("jdbc:clickhouse://localhost:8123")
>>>>   .build();
>>>>
>>>>   // create mock data table
>>>>   tableEnv.executeSql(
>>>>   parseSchema("clickhouse_source_table.sql") +
>>>>   "WITH (" +
>>>>   "'connector' = 'datagen'," +
>>>>   "'number-of-rows' = '1000')");
>>>>
>>>>   tableEnv.executeSql(
>>>>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>>>> CLICKHOUSE_SOURCE_BENCHMARK");
>>>>
>>>> }
>>>>
>>>> *running command:*
>>>>
>>>

Re: flink sql jmh failure

2021-03-24 Thread jie mei
Hi, Yik San

I use a library wroten by myself and trying to verify the performance.


Yik San Chan  于2021年3月24日周三 下午9:07写道:

> Hi Jie,
>
> I am curious what library do you use to get the ClickHouseTableBuilder
>
> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>
>> Hi, Community
>>
>> I run a jmh benchmark task get blew error, which use flink sql consuming
>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>> is partly log and you can see completable log by attached file
>>
>> *My jmh benchmark code as blew:*
>>
>> @Benchmark
>> @Threads(1)
>> @Fork(1)
>> public void sinkBenchmark() throws IOException {
>>
>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>   .getExecutionEnvironment();
>>   streamEnv.enableCheckpointing(6);
>>
>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>   .useBlinkPlanner()
>>   .inStreamingMode().build();
>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>> settings);
>>
>>   // create clickhouse table
>>   new ClickHouseTableBuilder(tableEnv,
>>   parseSchema("clickhouse_sink_table.sql"))
>>   .database("benchmark")
>>   .table("bilophus_sink_benchmark")
>>   .address("jdbc:clickhouse://localhost:8123")
>>   .build();
>>
>>   // create mock data table
>>   tableEnv.executeSql(
>>   parseSchema("clickhouse_source_table.sql") +
>>   "WITH (" +
>>   "'connector' = 'datagen'," +
>>   "'number-of-rows' = '1000')");
>>
>>   tableEnv.executeSql(
>>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>> CLICKHOUSE_SOURCE_BENCHMARK");
>>
>> }
>>
>> *running command:*
>>
>> mvn clean package -DskipTests
>>
>> 
>>   org.codehaus.mojo
>>   exec-maven-plugin
>>   1.6.0
>>   
>> 
>>   test-benchmarks
>>   test
>>   
>> exec
>>   
>> 
>>   
>>   
>> false
>> test
>> java
>> 
>>   -Xmx6g
>>   -classpath
>>   
>>   org.openjdk.jmh.Main
>>   
>>   -foe
>>   true
>>   
>>   -f
>>   1
>>   -i
>>   1
>>   -wi
>>   0
>>   -rf
>>   csv
>>   .*
>> 
>>   
>> 
>>
>>
>> Non-finished threads:
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (1/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:20

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi, Leonard


好的,我将会提一个PR来修复这个issue


Leonard Xu  于2020年12月10日周四 下午12:10写道:

> 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider,
> 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为
> OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
> 也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。
>
> 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?
>
> 祝好,
> Leonard
>
>
>
> 在 2020年12月10日,11:22,jie mei  写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu  于2020年12月10日周四 上午11:17写道:
>
>> Hi Jie,
>>
>> 看起来确实是个问题。
>> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
>> 可以帮忙创建个 issue 么?
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>>
>> > Hi,
>> >是的,感觉你是对的。
>> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
>> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
>> snapshotState
>> > 时候调用format.flush。
>> >WDYT @Jark @ Leonard
>> >
>> > Best,
>> > Hailong
>> >
>> >
>> > 在 2020-12-09 17:13:14,"jie mei"  写道:
>> > >Hi, Community
>> > >
>> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>> > >
>> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
>> > >
>> > >
>> > >@Public
>> > >public interface OutputFormat extends Serializable {
>> > >
>> > >   /**
>> > >* Configures this output format. Since output formats are
>> > >instantiated generically and hence parameterless,
>> > >* this method is the place where the output formats set their
>> > >basic fields based on configuration values.
>> > >* 
>> > >* This method is always called first on a newly instantiated
>> output format.
>> > >*
>> > >* @param parameters The configuration with all parameters.
>> > >*/
>> > >   void configure(Configuration parameters);
>> > >
>> > >   /**
>> > >* Opens a parallel instance of the output format to store the
>> > >result of its parallel instance.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> > >be configured.
>> > >*
>> > >* @param taskNumber The number of the parallel instance.
>> > >* @param numTasks The number of parallel tasks.
>> > >* @throws IOException Thrown, if the output could not be opened
>> > >due to an I/O problem.
>> > >*/
>> > >   void open(int taskNumber, int numTasks) throws IOException;
>> > >
>> > >
>> > >   /**
>> > >* Adds a record to the output.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @param record The records to add to the output.
>> > >* @throws IOException Thrown, if the records could not be added to
>> > >to an I/O problem.
>> > >*/
>> > >   void writeRecord(IT record) throws IOException;
>> > >
>> > >   /**
>> > >* Method that marks the end of the life-cycle of parallel output
>> > >instance. Should be used to close
>> > >* channels and streams and release resources.
>> > >* After this method returns without an error, the output is
>> > >assumed to be correct.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @throws IOException Thrown, if the input could not be closed
>> properly.
>> > >*/
>> > >   void close() throws IOException;
>> > >}
>> > >
>> > >
>> > >--
>> > >
>> > >*Best Regards*
>> > >*Jeremy Mei*
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
>
>

-- 

*Best Regards*
*Jeremy Mei*


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi,Jark

好的,我会就此创建一个issue

Jark Wu  于2020年12月10日周四 上午11:17写道:

> Hi Jie,
>
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>
> > Hi,
> >是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
> snapshotState
> > 时候调用format.flush。
> >WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei"  写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat extends Serializable {
> > >
> > >   /**
> > >* Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >* this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >* 
> > >* This method is always called first on a newly instantiated output
> format.
> > >*
> > >* @param parameters The configuration with all parameters.
> > >*/
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >* Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >* 
> > >* When this method is called, the output format it guaranteed to
> > >be configured.
> > >*
> > >* @param taskNumber The number of the parallel instance.
> > >* @param numTasks The number of parallel tasks.
> > >* @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >*/
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >* Adds a record to the output.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @param record The records to add to the output.
> > >* @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >*/
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >* Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >* channels and streams and release resources.
> > >* After this method returns without an error, the output is
> > >assumed to be correct.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @throws IOException Thrown, if the input could not be closed
> properly.
> > >*/
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
>


-- 

*Best Regards*
*Jeremy Mei*


flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi, Community

JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。

我的问题是:是否有办法强制刷新buffer中的数据入库?


@Public
public interface OutputFormat extends Serializable {

   /**
* Configures this output format. Since output formats are
instantiated generically and hence parameterless,
* this method is the place where the output formats set their
basic fields based on configuration values.
* 
* This method is always called first on a newly instantiated output format.
*
* @param parameters The configuration with all parameters.
*/
   void configure(Configuration parameters);

   /**
* Opens a parallel instance of the output format to store the
result of its parallel instance.
* 
* When this method is called, the output format it guaranteed to
be configured.
*
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws IOException Thrown, if the output could not be opened
due to an I/O problem.
*/
   void open(int taskNumber, int numTasks) throws IOException;


   /**
* Adds a record to the output.
* 
* When this method is called, the output format it guaranteed to be opened.
*
* @param record The records to add to the output.
* @throws IOException Thrown, if the records could not be added to
to an I/O problem.
*/
   void writeRecord(IT record) throws IOException;

   /**
* Method that marks the end of the life-cycle of parallel output
instance. Should be used to close
* channels and streams and release resources.
* After this method returns without an error, the output is
assumed to be correct.
* 
* When this method is called, the output format it guaranteed to be opened.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
   void close() throws IOException;
}


-- 

*Best Regards*
*Jeremy Mei*


动态表 Change Log 格式

2020-12-04 Thread jie mei
Hi, Community

Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE).
其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗?
我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。
此外,Delete语句对应的数据是完整记录还是操作日志呢?

这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。
并通过额外的逻辑判断来获得最新的数据是可行的。

-- 

*Best Regards*
*Jeremy Mei*