hi,可以取最新的一条数据:

select id, last_value(value) over (partition by id order by id range between 1 
prodding and current row ) as cur_value  from table_ddl 
通过分组分组获取最新的一条数据。
具体可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html>

> 在 2020年6月10日,下午3:18,方盛凯 <fskm...@gmail.com> 写道:
> 
> 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
> 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
> 我个人猜可能有两种方案:
> 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
> 2.定期向文件系统写入数据。
> 
> 
> 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:
> 
>> 各位大佬好,请教一个问题:
>> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl&nbsp;,是否由于'update-mode' =
>> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>> 
>> 
>> table_ddl = """
>> CREATE TABLE table_ddl&nbsp;(
>> &nbsp;trck_id VARCHAR
>> ) WITH (
>> &nbsp;'connector.type' = 'kafka',
>> &nbsp;'connector.version' = 'universal',&nbsp; &nbsp;&nbsp;
>> &nbsp;'connector.topic' = 'w',&nbsp; &nbsp;&nbsp;
>> &nbsp;'connector.startup-mode' = 'group-offsets',
>> &nbsp;'connector.properties.group.id' = 'trck_w',&nbsp;
>> &nbsp;'update-mode' = 'append',
>> &nbsp;'connector.properties.zookeeper.connect' = '*',
>> &nbsp;'connector.properties.bootstrap.servers' = '%#',
>> &nbsp;'format.type' = 'json',&nbsp; &nbsp; &nbsp;
>> &nbsp;'format.derive-schema' = 'true'&nbsp;
>> )
>> """

回复