Re:Re:Re:yarn api 提交报错

2022-04-08 Thread Mang Zhang



这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app
这个项目理解清楚YARN APP的机制和原理;


回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 
相关命令来提交任务
你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点;


另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考
https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html




希望对你有所帮助




--

Best regards,
Mang Zhang





在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道:
>非常感谢Mang Zhang的回复
>AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:
>
>Log Type: jobmanager.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 107
>
>Error: Could not find or load main class 
>org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>
>
>Log Type: jobmanager.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 70
>
>Setting up env variables
>Setting up job resources
>Launching container
>
>
>我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
>提交到yarn后,启动jm时抛错。一直未找到原因
>
>
>
>
>另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-04-08 17:57:48,"Mang Zhang"  写道:
>
>
>
>
>这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
>比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
>另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
>比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
>注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;
>
>
>建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了
>
>
>
>
>
>
>
>祝好
>
>
>
>
>
>
>
>--
>
>Best regards,
>Mang Zhang
>
>
>
>
>
>在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>>
>>
>>
>>
>>
>>
>>hi,
>>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>>flink版本1.14.4
>>Hadoop版本:3.0.0-cdh6.2.1
>>application模式,使用命令提交正常运行,api提交失败
>>提交失败,yarn日志:
>>   LogType:jobmanager.err
>>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>>   LogLength:107
>>   LogContents:
>>   Error: Could not find or load main class 
>> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>>   End of LogType:jobmanager.err
>>以下是代码:
>>
>>
>> System.setProperty("HADOOP_USER_NAME", "hdfs");
>>//flink的本地配置目录,为了得到flink的配置
>>String configurationDirectory = 
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>>
>>//存放flink集群相关的jar包目录
>>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>>//用户jar
>>String userJarPath = 
>>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>>String flinkDistJar = 
>> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>>
>>YarnClientService yarnClientService = new YarnClientService();
>>//yarnclient创建
>>YarnClient yarnClient = yarnClientService.getYarnClient();
>>yarnClient.start();
>>
>>// 设置日志的,没有的话看不到日志
>>YarnClusterInformationRetriever clusterInformationRetriever = 
>>YarnClientYarnClusterInformationRetriever
>>.create(yarnClient);
>>
>>//获取flink的配置
>>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>>configurationDirectory);
>>
>>
>> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>>
>>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
>> true);
>>
>>flinkConfiguration.set(PipelineOptions.JARS, 
>> Collections.singletonList(userJarPath));
>>
>>Path remoteLib = new Path(flinkLibs);
>>flinkConfiguration.set(
>>YarnConfigOptions.PROVIDED_LIB_DIRS,
>>Collections.singletonList(remoteLib.toString()));
>>
>>flinkConfiguration.set(
>>YarnConfigOptions.FLINK_DIST_JAR,
>>flinkDistJar);
>>
>>// 设置为application模式

Re:yarn api 提交报错

2022-04-08 Thread Mang Zhang



这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了










--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>.getMebiBytes();
>final int taskManagerMemoryMB =
>TaskExecutorProcessUtils.processSpecFromConfig(
>TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>.getTotalProcessMemorySize()
>.getMebiBytes();
>ClusterSpecification clusterSpecification = new 
> ClusterSpecification.ClusterSpecificationBuilder()
>.setMasterMemoryMB(jobManagerMemoryMB)
>.setTaskManagerMemoryMB(taskManagerMemoryMB)
>
> .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
>.createClusterSpecification();
>YarnClusterDescriptor yarnClusterDescriptor = new 
> YarnClusterDescriptor(
>flinkConfiguration,
>(YarnConfiguration) yarnClient.getConfig(),
>yar

Re:flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-08 Thread Mang Zhang



图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里;
或者是把异常信息直接贴在邮件内容里










--

Best regards,
Mang Zhang




在 2022-04-07 16:25:12,"su wenwen"  写道:

hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:




blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。

Re:Flink写入Hive错误

2022-03-21 Thread Mang Zhang
在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表;
我理解你现在想要做的是,将flink 表的数据写入到一个hive table里


HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) 
null, "2.1.1");

tableEnv.registerCatalog("devHive", hiveCatalog);
// 去掉这部分,还使用flink默认的catalog
//tableEnv.useCatalog("devHive");
//tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//tableEnv.useDatabase("default");
tableEnv.createTemporaryView("sourceTable", stepFr);
// sql 改写一下, ${catalog name}.${db name}.${table name}
String sql = "insert into devHive.default.zyz select * from 
sourceTable";
    tableEnv.executeSql(sql);




可以试试这样写




--

Best regards,
Mang Zhang





At 2022-03-15 14:13:56, "顾斌杰"  wrote:
>
>Flink版本:1.13.3
>Hive版本:2.1.1
>
>
>StreamTableEnvironment  tableEnv = StreamTableEnvironment.create(env);
>
>DataStream userBehaviorDataStream = source
>.map(new 
> UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull);
>userBehaviorDataStream.print();
>SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new 
> ProcessFunction() {
>private static final long serialVersionUID = 6365847542902145255L;
>
>@Override
>public void processElement(UserBehavior value, Context ctx, 
> Collector out) throws Exception {
>Row row = new Row(2);
>row.setField(0, value.getAppName());
>row.setField(1, value.getAppName());
>out.collect(row);
>}
>});
>
>HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) 
> null, "2.1.1");
>tableEnv.registerCatalog("devHive", hiveCatalog);
>tableEnv.useCatalog("devHive");
>tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>tableEnv.useDatabase("default");
>tableEnv.createTemporaryView("sourceTable", stepFr);
>
>String sql = "insert into zyz select * from sourceTable";
>tableEnv.executeSql(sql);
>
>
>但是他老是报错,我想请问是否我写错了什么?
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][isValidPojoField][1991] - 
>class java.util.LinkedHashMap does not contain a getter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][isValidPojoField][1994] - 
>class java.util.LinkedHashMap does not contain a setter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][analyzePojo][2037] - 
>Class class java.util.LinkedHashMap cannot be used as a POJO type because not 
>all fields are valid POJO fields, and must be processed as GenericType. Please 
>read the Flink documentation on "Data Types & Serialization" for details of 
>the effect on performance.
>2022-03-15 14:11:39.242 [main] INFO  [TypeExtractor][analyzePojo][2093] - 
>class org.apache.flink.types.Row is missing a default constructor so it cannot 
>be used as a POJO type and must be processed as GenericType. Please read the 
>Flink documentation on "Data Types & Serialization" for details of the effect 
>on performance.
>2022-03-15 14:11:39.448 [main] INFO  [HiveCatalog][createHiveConf][257] - 
>Setting hive conf dir as null
>2022-03-15 14:11:39.449 [main] INFO  [HiveCatalog][createHiveConf][278] - 
>Found hive-site.xml in classpath: 
>file:/D:/JetBrains/IdeaProject/paat_realtime_deal/target/classes/hive-site.xml
>2022-03-15 14:11:39.491 [main] INFO  [HiveCatalog][][219] - Created 
>HiveCatalog 'devHive'
>2022-03-15 14:11:40.063 [main] INFO  [HiveCatalog][open][299] - Connected to 
>Hive metastore
>2022-03-15 14:11:40.161 [main] INFO  [CatalogManager][setCurrentCatalog][262] 
>- Set the current default catalog as [devHive] and the current default 
>database as [default].
>2022-03-15 14:11:41.158 [main] INFO  
>[HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical 
>plan
>2022-03-15 14:11:41.164 [main] INFO  
>[HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 
>of Semantic Analysis
>2022-03-15 14:11:41.164 [main] INFO  
>[HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source 
>tables
>2022-03-15 14:11:41.178 [main] ERROR 
>[HiveParserSemanticAnalyzer][getMetaData][1489] - 
>org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 
>'sourceTable'
>at 
> org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547)
>at 
> org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaDat