逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
最简单的方式可以将query改为带group by的,再看结果的差异。
更多关于Table和Changelog的概念可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html

chenxuying <cxydeve...@163.com> 于2020年8月4日周二 上午11:44写道:

> hi :
> flink table sql 1.11.0
> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>
>
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //                .inStreamingMode()
>                 .inBatchMode()
>                 .build();
>
>
> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> 不知道大佬们有没有例子可以比较容易理解
> 我的代码
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //                .inStreamingMode()
>                 .inBatchMode()
>                 .build();
> TableEnvironment tableEnvironment =
> TableEnvironment.create(environmentSettings);
> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> "     id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED      " +
> " )  " +
> " with ( " +
> "'connector' = 'jdbc',  " +
> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'username' = 'root' , " +
> " 'password' = 'root', " +
> " 'table-name' = 'mysqlsink' , " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'sink.buffer-flush.interval' = '2s', " +
> " 'sink.buffer-flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> "     id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED      " +
> " )  " +
> " with ( " +
> "'connector' = 'print'  " +
> " )");
> tableEnvironment.executeSql("insert into print_sink select id,game_id from
> mysql_source");

回复