Hi
这里存在一个问题是,使用了last_value或者first_value这样的udaf,但是如果多条数据来到经过udaf处理后结果还是和之前一样的情况下,是不会产出回撤流数据的,可以观察下你是否需要考虑这种情况。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "x2009438"<x2009...@126.com>; 发送时间: 2020年6月5日(星期五) 上午9:06 收件人: "user-zh"<user-zh@flink.apache.org>; 主题: Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗 感谢各位, 我先试试用Last_value这样的aggregate function绕过去。 @kcz 可能我表达不很清楚,具体描述一下遇到的具体场景就是:收到原始的数据,去mysql或者es里做维表关联,然后再以upsert的模式将结果写回mysql或es。 举个例子来说,我想按id为key更新整行数据(比如还有个字段amount是个随机的double类型值) select id, amount, …… groupby id; 这样子不行,必须 select id, amount, … groupby id,amount,……; 这样子,假设id是固定有限个,那么state个数应该也是有限的,但是加上amount,我理解state的状态就会+♾了。 也就是说我实际并不需要分组聚合,保存状态,用groupby只是为了让connector识别到keyfields。 发自我的iPhone > 在 2020年6月5日,08:46,kcz <573693...@qq.com> 写道: > > 我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧? > > > > > > ------------------ 原始邮件 ------------------ > 发件人: Leonard Xu <xbjt...@gmail.com&gt; > 发送时间: 2020年6月4日 21:01 > 收件人: user-zh <user-zh@flink.apache.org&gt; > 主题: 回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗 > > > > Hi, > > &gt; 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项, > > select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条, > > &gt; 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。 > > State 可以配置ttl的,过期清理参考[1] > > 另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2] > > Best, > Leonard Xu > > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time&gt; > [2] https://issues.apache.org/jira/browse/FLINK-17829 <https://issues.apache.org/jira/browse/FLINK-17829&gt;