各位好, 最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下: /** * @Author: ellis.guan * @Description: HBase测试类 * @Date: 2020/3/6 15:41 * @Version: 1.0 */ public class HbaseTest { private StreamExecutionEnvironment env; private StreamTableEnvironment tableEnv;
@Before public void init(){ env=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.sqlUpdate("create table resume01(\n" + " `rowkey` string,sdp_columns_family ROW<age string,mobile BIGINT> \n" + // " `binfo` ROW<age string,mobile string,site string>,\n" + // " edu ROW<university string>, \n" + // " work ROW<company1 string> \n" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'resume01'," + " 'connector.zookeeper.quorum' = 'localhost:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase'" + ")"); } @Test public void testReadFromHBase() throws Exception { // HBaseTableSource resume = new HBaseTableSource(); Table table = tableEnv.sqlQuery("select * from resume"); DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table, Row.class); out.print(); env.execute(); } @Test public void testWriterToHBase() throws Exception { DataStream<Row> source = env.fromElements( Row.of("ellis","2015-03-27","17352837822","changsha","hun nan","shiji"), Row.of("ellis","2015-03-28","17352837825","changsha1","hun nan","shiji"), Row.of("ellis","2015-03-279","17352837826","changsha2","hun nan","shiji")); tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1"); tableEnv.sqlUpdate("insert into resume01 select CONCAT_WS('_',age,name),ROW(age,mobile) from " + " (select name,age,sum(cast(mobile as bigint)) as mobile from source_resume group by name,age ) as tt"); env.execute(); } } 运行报错如下: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) at org.junit.runner.JUnitCore.run(JUnitCore.java:130) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) psyche19830...@163.com