这两个 print 的实现是不一样的。 dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。
而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在 client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client, 它的可见周期会受限于 checkpoint 的间隔。 Best, Feng Jin On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com <ha.fen...@aisino.com> wrote: > sink中只是打印 > > streamapi,checkpoint设置的精准一次 > env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL > Source").print(); > 数据库改变数据后,立即就可以在控制台打印出来。 > > sqlapi,checkpoint设置的精准一次 > Table custab = tEnv.sqlQuery("select * from orders "); > custab.execute().print(); > 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。 > 16:39:17,109 INFO > com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - > Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.000046, pos=11653, > kind=SPECIFIC, gtids=, row=0, event=0} > 16:39:17,231 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes, > checkpointDuration=218 ms, finalizationTime=6 ms). > 16:39:17,241 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 1 as completed for source Source: orders[1]. > > +----+-------------+---------------------+--------------------------------+-------------------+-------------+ > | op | id | addtime | cusname > | price | status | > > +----+-------------+---------------------+--------------------------------+-------------------+-------------+ > | +I | 616 | 2024-02-22 16:23:11 | name > | 3.23 | 7 | > | +I | 617 | 2024-03-01 11:42:03 | name > | 1.11 | 9 | > | +I | 612 | 2024-01-31 13:53:49 | name > | 1.29 | 1 | > > 这是什么原因? >