而且你的SQL里面有一部分是会产生retract的: 这里用的是regular left join,这种join类型是会产生retract结果的。
| FROM ( | SELECT `database`, `table`, `transaction_type`, `transaction_id`, | `merchant_id`, `event_time`, `status`, `reference_id` | FROM main_table | LEFT JOIN merchant_table | ON main_table.reference_id = merchant_table.transaction_sn | ) macia kk <pre...@gmail.com> 于2020年5月27日周三 上午1:20写道: > Hi,各位大佬,谁有空帮我看下这个问题 > > Source: Kafka > SinkL Kafka > > 主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条 > transaction_id,我这个模式应该是 append 模式,但是结果好像不是 > > Error > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: AppendStreamTableSink requires that Table has > only insert changes. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > > > > > > > > > Code > > val main_column = "`database`, `table`, `transaction_type`, > `transaction_id`, `reference_id`, `merchant_id`, `event_time`, > `status`" > val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM > Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ") > bsTableEnv.createTemporaryView("main_table", main_table) > > val merchant_column = "transaction_sn, user_id" > val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column > FROM Keystats_airpay_consumer WHERE `table` LIKE > 'wallet_id_merchant_db%' ") > bsTableEnv.createTemporaryView("merchant_table", merchant_table) > > bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer > | SELECT `database`, `table`, > `transaction_type`, > | `merchant_id`, `event_time`, `status`, > | FIRST_VALUE(`transaction_id`) OVER > (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED > PRECEDING) > | FROM ( > | SELECT `database`, `table`, > `transaction_type`, `transaction_id`, > | `merchant_id`, `event_time`, `status`, > `reference_id` > | FROM main_table > | LEFT JOIN merchant_table > | ON main_table.reference_id = > merchant_table.transaction_sn > | ) > |""".stripMargin) > -- Best, Benchao Li