??????
    
flink1.10??????????????????????????????hbase???????????? http://apache-flink.147419.n8.nabble.com/ ????????????????????????????????????
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        at 
com.autoai.cns.core.CNSDashboardDML$.responseTime(CNSDashboardDML.scala:178)
        at com.autoai.cns.core.CNSDashboardDML$.main(CNSDashboardDML.scala:60)
        at com.autoai.cns.core.CNSDashboardDML.main(CNSDashboardDML.scala)

    s"""
   |INSERT INTO ${databaseName}.response_time_sink
   |SELECT
   |  rowkey,
   |  ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, 
replace_avg_time, deviate_avg_time) AS cf
   |FROM
   |(
   |    select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) 
rowkey,`day`,`time`,
   |      MAX(CASE req_type WHEN '0' THEN num else 0 END) 
initialize_route_avg_time,
   |      MAX(CASE req_type WHEN '1' THEN num else 0 END) 
update_detour_avg_time,
   |      MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
   |      MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
   |    from
   |      (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, 
INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`,
   |      UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, 
TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000 
AS `time`,
   |      req_type,
   |      (CASE WHEN ResponseRemainingMile<=50 THEN '1'
   |             WHEN ResponseRemainingMile&gt; 50 AND ResponseRemainingMile<= 
250 THEN '2'
   |             WHEN ResponseRemainingMile&gt; 250 AND ResponseRemainingMile<= 
500 THEN '3'
   |             WHEN ResponseRemainingMile&gt; 500 THEN '4' END) as 
distance_range,
   |      CAST(AVG(`value`) AS INT) num
   |    FROM
   |      ${databaseName}.metric_stream
   |    WHERE
   |      metric = 'http_common_request'
   |    GROUP BY
   |      TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN 
ResponseRemainingMile<=50 THEN '1'
   |        WHEN ResponseRemainingMile&gt; 50 AND ResponseRemainingMile<= 250 
THEN '2'
   |        WHEN ResponseRemainingMile&gt; 250 AND ResponseRemainingMile<= 500 
THEN '3'
   |        WHEN ResponseRemainingMile&gt; 500 THEN '4' END)) 
   |  group by CONCAT_WS('_',CAST(`time` AS 
VARCHAR),distance_range),`day`,`time`
   |) a
   |""".stripMargin&nbsp; &nbsp; ????????????????????

回复