Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-03-01 文章 Feng Jin
这两个 print 的实现是不一样的。

 dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。

 而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在
client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client,
它的可见周期会受限于 checkpoint 的间隔。


Best,
Feng Jin

On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com 
wrote:

> sink中只是打印
>
> streamapi,checkpoint设置的精准一次
> env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
> Source").print();
> 数据库改变数据后,立即就可以在控制台打印出来。
>
> sqlapi,checkpoint设置的精准一次
> Table custab = tEnv.sqlQuery("select * from orders ");
> custab.execute().print();
> 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。
> 16:39:17,109 INFO
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] -
> Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.46, pos=11653,
> kind=SPECIFIC, gtids=, row=0, event=0}
> 16:39:17,231 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes,
> checkpointDuration=218 ms, finalizationTime=6 ms).
> 16:39:17,241 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 1 as completed for source Source: orders[1].
>
> ++-+-++---+-+
> | op |  id | addtime |cusname
> | price |  status |
>
> ++-+-++---+-+
> | +I | 616 | 2024-02-22 16:23:11 |   name
> |  3.23 |   7 |
> | +I | 617 | 2024-03-01 11:42:03 |   name
> |  1.11 |   9 |
> | +I | 612 | 2024-01-31 13:53:49 |   name
> |  1.29 |   1 |
>
> 这是什么原因?
>


Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-02-29 文章 Hang Ruan
你好,ha.fengqi。

MySQL CDC
连接器只有在多并发时,会依赖checkpoint的完成来切换到增量阶段。从你提供的代码上来看,是单并发的运行作业,所以应该Source
在这两者之间的行为不会有区别。
这个不同是不是有可能是下游在两种使用方式上,有什么区别?
可以通过观察具体的IO指标看到Source是否真的及时发出消息,如果比较熟悉代码,也可以自己添加一下打印日志来验证。

祝好,
Hang