加了primary key报错, Exception in thread "main" org.apache.flink.table.planner.operations.SqlConversionException: Primary key and unique key are not supported yet. at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
Query: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_uv( | `time` VARCHAR, | cnt bigint, | PRIMARY KEY (`time`) |) WITH ( | 'connector.type' = 'jdbc', | 'connector.write.flush.max-rows' = '1' |) |""".stripMargin) At 2020-06-17 20:59:35, "Zhou Zach" <is...@foxmail.com> wrote: >Exception in thread "main" org.apache.flink.table.api.TableException: >UpsertStreamTableSink requires that Table has a full primary keys if it is >updated. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > > > >Query: >Flink :1.10.0 >CREATE TABLE user_uv( >| `time` VARCHAR, >| cnt bigint >|) WITH ( >| 'connector.type' = 'jdbc') >|insert into user_uv >|select MAX(DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')) as `time`, >COUNT(DISTINCT uid) as cnt >|from `user` >|group by DATE_FORMAT(created_time, 'yyyy-MM-dd HH:mm:00')