加了primary key报错,
Exception in thread "main" 
org.apache.flink.table.planner.operations.SqlConversionException: Primary key 
and unique key are not supported yet.
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)


Query:


streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_uv(
    |    `time` VARCHAR,
    |    cnt bigint,
    |    PRIMARY KEY (`time`)
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)

















At 2020-06-17 20:59:35, "Zhou Zach" <is...@foxmail.com> wrote:
>Exception in thread "main" org.apache.flink.table.api.TableException: 
>UpsertStreamTableSink requires that Table has a full primary keys if it is 
>updated.
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>       at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
>       at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
>
>
>
>Query:
>Flink :1.10.0
>CREATE TABLE user_uv(
>|    `time` VARCHAR,
>|    cnt bigint
>|) WITH (
>|    'connector.type' = 'jdbc')
>|insert into user_uv
>|select  MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, 
>COUNT(DISTINCT  uid) as cnt
>|from `user`
>|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')

Reply via email to