而且你的SQL里面有一部分是会产生retract的:
这里用的是regular left join,这种join类型是会产生retract结果的。

                           | FROM (
                           |    SELECT `database`, `table`,
`transaction_type`, `transaction_id`,
                           |    `merchant_id`, `event_time`, `status`,
`reference_id`
                           |    FROM main_table
                           |    LEFT JOIN merchant_table
                           |    ON main_table.reference_id =
merchant_table.transaction_sn
                           | )


macia kk <pre...@gmail.com> 于2020年5月27日周三 上午1:20写道:

> Hi,各位大佬,谁有空帮我看下这个问题
>
> Source: Kafka
> SinkL Kafka
>
> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
>
> Error
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: AppendStreamTableSink requires that Table has
> only insert changes.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>     at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
>
>
>
>
>
>
>
> Code
>
>    val main_column = "`database`, `table`, `transaction_type`,
> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> `status`"
>     val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
>     bsTableEnv.createTemporaryView("main_table", main_table)
>
>     val merchant_column = "transaction_sn, user_id"
>     val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> FROM Keystats_airpay_consumer WHERE `table` LIKE
> 'wallet_id_merchant_db%' ")
>     bsTableEnv.createTemporaryView("merchant_table", merchant_table)
>
>     bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
>                            | SELECT `database`, `table`,
> `transaction_type`,
>                            |   `merchant_id`, `event_time`, `status`,
>                            |    FIRST_VALUE(`transaction_id`) OVER
> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> PRECEDING)
>                            | FROM (
>                            |    SELECT `database`, `table`,
> `transaction_type`, `transaction_id`,
>                            |    `merchant_id`, `event_time`, `status`,
> `reference_id`
>                            |    FROM main_table
>                            |    LEFT JOIN merchant_table
>                            |    ON main_table.reference_id =
> merchant_table.transaction_sn
>                            | )
>                            |""".stripMargin)
>


-- 

Best,
Benchao Li

回复