在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表; 我理解你现在想要做的是,将flink 表的数据写入到一个hive table里
HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) null, "2.1.1"); tableEnv.registerCatalog("devHive", hiveCatalog); // 去掉这部分,还使用flink默认的catalog //tableEnv.useCatalog("devHive"); //tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); //tableEnv.useDatabase("default"); tableEnv.createTemporaryView("sourceTable", stepFr); // sql 改写一下, ${catalog name}.${db name}.${table name} String sql = "insert into devHive.default.zyz select * from sourceTable"; tableEnv.executeSql(sql); 可以试试这样写 -- Best regards, Mang Zhang At 2022-03-15 14:13:56, "顾斌杰" <binjie...@paat.com> wrote: > >Flink版本:1.13.3 >Hive版本:2.1.1 > > >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > > DataStream userBehaviorDataStream = source > .map(new > UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull); > userBehaviorDataStream.print(); > SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new > ProcessFunction() { > private static final long serialVersionUID = 6365847542902145255L; > > @Override > public void processElement(UserBehavior value, Context ctx, > Collector out) throws Exception { > Row row = new Row(2); > row.setField(0, value.getAppName()); > row.setField(1, value.getAppName()); > out.collect(row); > } > }); > > HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) > null, "2.1.1"); > tableEnv.registerCatalog("devHive", hiveCatalog); > tableEnv.useCatalog("devHive"); > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.useDatabase("default"); > tableEnv.createTemporaryView("sourceTable", stepFr); > > String sql = "insert into zyz select * from sourceTable"; > tableEnv.executeSql(sql); > > >但是他老是报错,我想请问是否我写错了什么? >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1991] - >class java.util.LinkedHashMap does not contain a getter for field accessOrder >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1994] - >class java.util.LinkedHashMap does not contain a setter for field accessOrder >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2037] - >Class class java.util.LinkedHashMap cannot be used as a POJO type because not >all fields are valid POJO fields, and must be processed as GenericType. Please >read the Flink documentation on "Data Types & Serialization" for details of >the effect on performance. >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2093] - >class org.apache.flink.types.Row is missing a default constructor so it cannot >be used as a POJO type and must be processed as GenericType. Please read the >Flink documentation on "Data Types & Serialization" for details of the effect >on performance. >2022-03-15 14:11:39.448 [main] INFO [HiveCatalog][createHiveConf][257] - >Setting hive conf dir as null >2022-03-15 14:11:39.449 [main] INFO [HiveCatalog][createHiveConf][278] - >Found hive-site.xml in classpath: >file:/D:/JetBrains/IdeaProject/paat_realtime_deal/target/classes/hive-site.xml >2022-03-15 14:11:39.491 [main] INFO [HiveCatalog][][219] - Created >HiveCatalog 'devHive' >2022-03-15 14:11:40.063 [main] INFO [HiveCatalog][open][299] - Connected to >Hive metastore >2022-03-15 14:11:40.161 [main] INFO [CatalogManager][setCurrentCatalog][262] >- Set the current default catalog as [devHive] and the current default >database as [default]. >2022-03-15 14:11:41.158 [main] INFO >[HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical >plan >2022-03-15 14:11:41.164 [main] INFO >[HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 >of Semantic Analysis >2022-03-15 14:11:41.164 [main] INFO >[HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source >tables >2022-03-15 14:11:41.178 [main] ERROR >[HiveParserSemanticAnalyzer][getMetaData][1489] - >org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found >'sourceTable' > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547) > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487) > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283) > at > org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58) > at > com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53) > at > com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65) > at > com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16) > >Exception in thread "main" org.apache.flink.table.api.ValidationException: >HiveParser failed to parse insert into zyz select * from sourceTable > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:253) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58) > at > com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53) > at > com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65) > at > com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16) >Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table >not found 'sourceTable' > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547) > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487) > at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283) > at > org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238) > ... 6 more > > > > > > > > > > > > > > > > > > > > > > > >