?????? flink1.10??????????????????????????????hbase???????????????????? Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
s""" |INSERT INTO ${databaseName}.response_time_sink |SELECT | rowkey, | ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, replace_avg_time, deviate_avg_time) AS cf |FROM |( | select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) rowkey,`day`,`time`, | MAX(CASE req_type WHEN '0' THEN num else 0 END) initialize_route_avg_time, | MAX(CASE req_type WHEN '1' THEN num else 0 END) update_detour_avg_time, | MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time, | MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time | from | (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`, | UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000 AS `time`, | req_type, | (CASE WHEN ResponseRemainingMile<=50 THEN '1' | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2' | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3' | WHEN ResponseRemainingMile> 500 THEN '4' END) as distance_range, | CAST(AVG(`value`) AS INT) num | FROM | ${databaseName}.metric_stream | WHERE | metric = 'http_common_request' | GROUP BY | TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN ResponseRemainingMile<=50 THEN '1' | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2' | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3' | WHEN ResponseRemainingMile> 500 THEN '4' END)) | group by CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range),`day`,`time` |) a |""".stripMargin ????????????????????