产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
2. 非inner/anti 的join(不包括time interval
join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
3. 取latest的去重
4. topn,排名变化需要更新结果
5. window + emit,提前emit的结果需要retract来更新

macia kk <pre...@gmail.com> 于2020年5月27日周三 下午6:19写道:

> 感谢 Benchao 和  Leonard 的回复
>
> 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> 出去,但是什么情况下会产生 react 消息呢?
>
> Leonard Xu <xbjt...@gmail.com> 于2020年5月27日周三 下午3:50写道:
>
> > Hi
> > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > sink无法处理retract消息。
> > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> >
> > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> >
> >
> > 祝好,
> > Leonard Xu
> >
> >
> >
> > > 在 2020年5月27日,10:23,Benchao Li <libenc...@gmail.com> 写道:
> > >
> > > 而且你的SQL里面有一部分是会产生retract的:
> > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > >
> > >                           | FROM (
> > >                           |    SELECT `database`, `table`,
> > > `transaction_type`, `transaction_id`,
> > >                           |    `merchant_id`, `event_time`, `status`,
> > > `reference_id`
> > >                           |    FROM main_table
> > >                           |    LEFT JOIN merchant_table
> > >                           |    ON main_table.reference_id =
> > > merchant_table.transaction_sn
> > >                           | )
> > >
> > >
> > > macia kk <pre...@gmail.com> 于2020年5月27日周三 上午1:20写道:
> > >
> > >> Hi,各位大佬,谁有空帮我看下这个问题
> > >>
> > >> Source: Kafka
> > >> SinkL Kafka
> > >>
> > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> 函数取第一条
> > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > >>
> > >> Error
> > >>
> > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > >> method caused an error: AppendStreamTableSink requires that Table has
> > >> only insert changes.
> > >>    at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > >>    at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > >>    at
> > >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> Code
> > >>
> > >>   val main_column = "`database`, `table`, `transaction_type`,
> > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > >> `status`"
> > >>    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > >>    bsTableEnv.createTemporaryView("main_table", main_table)
> > >>
> > >>    val merchant_column = "transaction_sn, user_id"
> > >>    val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > >> 'wallet_id_merchant_db%' ")
> > >>    bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > >>
> > >>    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > >>                           | SELECT `database`, `table`,
> > >> `transaction_type`,
> > >>                           |   `merchant_id`, `event_time`, `status`,
> > >>                           |    FIRST_VALUE(`transaction_id`) OVER
> > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > >> PRECEDING)
> > >>                           | FROM (
> > >>                           |    SELECT `database`, `table`,
> > >> `transaction_type`, `transaction_id`,
> > >>                           |    `merchant_id`, `event_time`, `status`,
> > >> `reference_id`
> > >>                           |    FROM main_table
> > >>                           |    LEFT JOIN merchant_table
> > >>                           |    ON main_table.reference_id =
> > >> merchant_table.transaction_sn
> > >>                           | )
> > >>                           |""".stripMargin)
> > >>
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> >
> >
>


-- 

Best,
Benchao Li

回复