Hi Henry,

The problem is that the table that results from the query does not have a
unique key.
You can only use an upsert sink if the table has a (composite) unique key.
Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to
write any kind of Table (append-only/update, keyed/non-keyed) to an
external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 <happydexu...@gmail.com>:

> Hi All,
> I am using flink 1.6 to generate some realtime programs. I want to write
> the output to table sink, the code is as below. At first I use append table
> sink, which error message tells me that I should use upsert table sink, so
> I write one. But still another error “Caused by: 
> org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.” comes out,which blocks me. My questions is how to modify a table
> keys in this scenario? I also check the exception stack, and found that the
> system infer the keys field by
> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.
> getUniqueKeyFields(optimizedPlan), I wonder how to make the function
> return value ?
> Thanks a lot !!!
>
>     var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
> praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
>     tableEnv.registerTable("praiseAggr", praise)
>
>     var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
> FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
>     tableEnv.registerTable("commentAggr", comment)
>
>     var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
> reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
>     tableEnv.registerTable("readerAggr", reader)
>
>     var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " 
> +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
> commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
> c.article_id=r.article_id")
>
>
>
>
> Thank,
> Henry Xu
>

Reply via email to