可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

回复