Re:Re:Re:yarn api 提交报错

2022-04-08 Thread Mang Zhang

这个项目理解清楚YARN APP的机制和原理;

回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 

另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考



Mang Zhang

>非常感谢Mang Zhang的回复
>Log Type: jobmanager.err
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>Log Length: 107
>Error: Could not find or load main class 
>Log Type: jobmanager.out
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>Log Length: 0
>Log Type: prelaunch.err
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>Log Length: 0
>Log Type: prelaunch.out
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>Log Length: 70
>Setting up env variables
>Setting up job resources
>Launching container
>比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
>比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
>>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>>   LogType:jobmanager.err
>>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>>   LogLength:107
>>   LogContents:
>>   Error: Could not find or load main class 
>> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>>   End of LogType:jobmanager.err
>> System.setProperty("HADOOP_USER_NAME", "hdfs");
>>String configurationDirectory = 
>>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>>String userJarPath = 
>>String flinkDistJar = 
>> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>>YarnClientService yarnClientService = new YarnClientService();
>>YarnClient yarnClient = yarnClientService.getYarnClient();
>>// 设置日志的,没有的话看不到日志
>>YarnClusterInformationRetriever clusterInformationRetriever = 
>>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>> true);
>> Collections.singletonList(userJarPath));
>>Path remoteLib = new Path(flinkLibs);
>>// 设置为application模式

Re:flink sql 任务中jm Blob server 总是在凌晨报 :unknow opreation 71

2022-04-08 Thread Mang Zhang



hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:

blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。


2022-03-21 Thread Mang Zhang
在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表;
我理解你现在想要做的是,将flink 表的数据写入到一个hive table里

HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) 
null, "2.1.1");

tableEnv.registerCatalog("devHive", hiveCatalog);
// 去掉这部分,还使用flink默认的catalog
tableEnv.createTemporaryView("sourceTable", stepFr);
// sql 改写一下, ${catalog name}.${db name}.${table name}
String sql = "insert into devHive.default.zyz select * from 



>StreamTableEnvironment  tableEnv = StreamTableEnvironment.create(env);
>DataStream userBehaviorDataStream = source
> UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull);
>SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new 
> ProcessFunction() {
>private static final long serialVersionUID = 6365847542902145255L;
>public void processElement(UserBehavior value, Context ctx, 
> Collector out) throws Exception {
>Row row = new Row(2);
>row.setField(0, value.getAppName());
>row.setField(1, value.getAppName());
>HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) 
> null, "2.1.1");
>tableEnv.registerCatalog("devHive", hiveCatalog);
>tableEnv.createTemporaryView("sourceTable", stepFr);
>String sql = "insert into zyz select * from sourceTable";
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][isValidPojoField][1991] - 
>class java.util.LinkedHashMap does not contain a getter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][isValidPojoField][1994] - 
>class java.util.LinkedHashMap does not contain a setter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][analyzePojo][2037] - 
>Class class java.util.LinkedHashMap cannot be used as a POJO type because not 
>all fields are valid POJO fields, and must be processed as GenericType. Please 
>read the Flink documentation on "Data Types & Serialization" for details of 
>the effect on performance.
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][analyzePojo][2093] - 
>class org.apache.flink.types.Row is missing a default constructor so it cannot 
>be used as a POJO type and must be processed as GenericType. Please read the 
>Flink documentation on "Data Types & Serialization" for details of the effect 
>on performance.
>2022-03-15 14:11:39.448 [main] INFO  [HiveCatalog][createHiveConf][257] - 
>Setting hive conf dir as null
>2022-03-15 14:11:39.449 [main] INFO  [HiveCatalog][createHiveConf][278] - 
>Found hive-site.xml in classpath: 
>2022-03-15 14:11:39.491 [main] INFO  [HiveCatalog][][219] - Created 
>HiveCatalog 'devHive'
>2022-03-15 14:11:40.063 [main] INFO  [HiveCatalog][open][299] - Connected to 
>Hive metastore
>2022-03-15 14:11:40.161 [main] INFO  [CatalogManager][setCurrentCatalog][262] 
>- Set the current default catalog as [devHive] and the current default 
>database as [default].
>2022-03-15 14:11:41.158 [main] INFO  
>[HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical 
>2022-03-15 14:11:41.164 [main] INFO  
>[HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 
>of Semantic Analysis
>2022-03-15 14:11:41.164 [main] INFO  
>[HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source 
>2022-03-15 14:11:41.178 [main] ERROR 
>[HiveParserSemanticAnalyzer][getMetaData][1489] - 
>org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 
> org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(
> org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaDat