这两个 print 的实现是不一样的。

 dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。

 而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在
client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client,
它的可见周期会受限于 checkpoint 的间隔。


Best,
Feng Jin

On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com <ha.fen...@aisino.com>
wrote:

> sink中只是打印
>
> streamapi,checkpoint设置的精准一次
> env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
> Source").print();
> 数据库改变数据后,立即就可以在控制台打印出来。
>
> sqlapi,checkpoint设置的精准一次
> Table custab = tEnv.sqlQuery("select * from orders ");
> custab.execute().print();
> 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。
> 16:39:17,109 INFO
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] -
> Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.000046, pos=11653,
> kind=SPECIFIC, gtids=, row=0, event=0}
> 16:39:17,231 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed
> checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes,
> checkpointDuration=218 ms, finalizationTime=6 ms).
> 16:39:17,241 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 1 as completed for source Source: orders[1].
>
> +----+-------------+---------------------+--------------------------------+-------------------+-------------+
> | op |          id |             addtime |                        cusname
> |             price |      status |
>
> +----+-------------+---------------------+--------------------------------+-------------------+-------------+
> | +I |         616 | 2024-02-22 16:23:11 |                           name
> |              3.23 |           7 |
> | +I |         617 | 2024-03-01 11:42:03 |                           name
> |              1.11 |           9 |
> | +I |         612 | 2024-01-31 13:53:49 |                           name
> |              1.29 |           1 |
>
> 这是什么原因?
>

回复