这两个 print 的实现是不一样的。
dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。
而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在
client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client,
它的可见周期会受限于 checkpoint 的间隔。
Best,
Feng Jin
On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com
wrote:
> sink中只是打印
>
> streamapi,checkpoint设置的精准一次
> env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
> Source").print();
> 数据库改变数据后,立即就可以在控制台打印出来。
>
> sqlapi,checkpoint设置的精准一次
> Table custab = tEnv.sqlQuery("select * from orders ");
> custab.execute().print();
> 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。
> 16:39:17,109 INFO
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] -
> Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.46, pos=11653,
> kind=SPECIFIC, gtids=, row=0, event=0}
> 16:39:17,231 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes,
> checkpointDuration=218 ms, finalizationTime=6 ms).
> 16:39:17,241 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 1 as completed for source Source: orders[1].
>
> ++-+-++---+-+
> | op | id | addtime |cusname
> | price | status |
>
> ++-+-++---+-+
> | +I | 616 | 2024-02-22 16:23:11 | name
> | 3.23 | 7 |
> | +I | 617 | 2024-03-01 11:42:03 | name
> | 1.11 | 9 |
> | +I | 612 | 2024-01-31 13:53:49 | name
> | 1.29 | 1 |
>
> 这是什么原因?
>