产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧: 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果 2. 非inner/anti 的join(不包括time interval join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract 3. 取latest的去重 4. topn,排名变化需要更新结果 5. window + emit,提前emit的结果需要retract来更新
macia kk <pre...@gmail.com> 于2020年5月27日周三 下午6:19写道: > 感谢 Benchao 和 Leonard 的回复 > > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit > 出去,但是什么情况下会产生 react 消息呢? > > Leonard Xu <xbjt...@gmail.com> 于2020年5月27日周三 下午3:50写道: > > > Hi > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 > > sink无法处理retract消息。 > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误, > > > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新 > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。 > > > > > > 祝好, > > Leonard Xu > > > > > > > > > 在 2020年5月27日,10:23,Benchao Li <libenc...@gmail.com> 写道: > > > > > > 而且你的SQL里面有一部分是会产生retract的: > > > 这里用的是regular left join,这种join类型是会产生retract结果的。 > > > > > > | FROM ( > > > | SELECT `database`, `table`, > > > `transaction_type`, `transaction_id`, > > > | `merchant_id`, `event_time`, `status`, > > > `reference_id` > > > | FROM main_table > > > | LEFT JOIN merchant_table > > > | ON main_table.reference_id = > > > merchant_table.transaction_sn > > > | ) > > > > > > > > > macia kk <pre...@gmail.com> 于2020年5月27日周三 上午1:20写道: > > > > > >> Hi,各位大佬,谁有空帮我看下这个问题 > > >> > > >> Source: Kafka > > >> SinkL Kafka > > >> > > >> 主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE > 函数取第一条 > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是 > > >> > > >> Error > > >> > > >> org.apache.flink.client.program.ProgramInvocationException: The main > > >> method caused an error: AppendStreamTableSink requires that Table has > > >> only insert changes. > > >> at > > >> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > > >> at > > >> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > > >> at > > >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> Code > > >> > > >> val main_column = "`database`, `table`, `transaction_type`, > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`, > > >> `status`" > > >> val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ") > > >> bsTableEnv.createTemporaryView("main_table", main_table) > > >> > > >> val merchant_column = "transaction_sn, user_id" > > >> val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE > > >> 'wallet_id_merchant_db%' ") > > >> bsTableEnv.createTemporaryView("merchant_table", merchant_table) > > >> > > >> bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer > > >> | SELECT `database`, `table`, > > >> `transaction_type`, > > >> | `merchant_id`, `event_time`, `status`, > > >> | FIRST_VALUE(`transaction_id`) OVER > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED > > >> PRECEDING) > > >> | FROM ( > > >> | SELECT `database`, `table`, > > >> `transaction_type`, `transaction_id`, > > >> | `merchant_id`, `event_time`, `status`, > > >> `reference_id` > > >> | FROM main_table > > >> | LEFT JOIN merchant_table > > >> | ON main_table.reference_id = > > >> merchant_table.transaction_sn > > >> | ) > > >> |""".stripMargin) > > >> > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > -- Best, Benchao Li