Re:Re:Re:yarn api 提交报错
这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app 这个项目理解清楚YARN APP的机制和原理; 回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 相关命令来提交任务 你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点; 另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html 希望对你有所帮助 -- Best regards, Mang Zhang 在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道: >非常感谢Mang Zhang的回复 >AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类: > >Log Type: jobmanager.err > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 107 > >Error: Could not find or load main class >org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > > >Log Type: jobmanager.out > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 0 > >Log Type: prelaunch.err > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 0 > >Log Type: prelaunch.out > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 70 > >Setting up env variables >Setting up job resources >Launching container > > >我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar >提交到yarn后,启动jm时抛错。一直未找到原因 > > > > >另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务 > > > > > > > > > > > > > > >在 2022-04-08 17:57:48,"Mang Zhang" 写道: > > > > >这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log >比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 >另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN >比如flink lib下的jar,使用 YarnLocalResourceDescriptor >注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; > > >建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 > > > > > > > >祝好 > > > > > > > >-- > >Best regards, >Mang Zhang > > > > > >在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: >> >> >> >> >> >> >>hi, >>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >>flink版本1.14.4 >>Hadoop版本:3.0.0-cdh6.2.1 >>application模式,使用命令提交正常运行,api提交失败 >>提交失败,yarn日志: >> LogType:jobmanager.err >> LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 >> LogLength:107 >> LogContents: >> Error: Could not find or load main class >> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint >> End of LogType:jobmanager.err >>以下是代码: >> >> >> System.setProperty("HADOOP_USER_NAME", "hdfs"); >>//flink的本地配置目录,为了得到flink的配置 >>String configurationDirectory = >>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; >> >>//存放flink集群相关的jar包目录 >>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >>//用户jar >>String userJarPath = >>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >>String flinkDistJar = >> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; >> >>YarnClientService yarnClientService = new YarnClientService(); >>//yarnclient创建 >>YarnClient yarnClient = yarnClientService.getYarnClient(); >>yarnClient.start(); >> >>// 设置日志的,没有的话看不到日志 >>YarnClusterInformationRetriever clusterInformationRetriever = >>YarnClientYarnClusterInformationRetriever >>.create(yarnClient); >> >>//获取flink的配置 >>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >>configurationDirectory); >> >> >> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); >> >>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, >> true); >> >>flinkConfiguration.set(PipelineOptions.JARS, >> Collections.singletonList(userJarPath)); >> >>Path remoteLib = new Path(flinkLibs); >>flinkConfiguration.set( >>YarnConfigOptions.PROVIDED_LIB_DIRS, >>Collections.singletonList(remoteLib.toString())); >> >>flinkConfiguration.set( >>YarnConfigOptions.FLINK_DIST_JAR, >>flinkDistJar); >> >>// 设置为application模式
Re:yarn api 提交报错
这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log 比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN 比如flink lib下的jar,使用 YarnLocalResourceDescriptor 注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; 建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 -- Best regards, Mang Zhang 在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: > > > > > > >hi, >我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >flink版本1.14.4 >Hadoop版本:3.0.0-cdh6.2.1 >application模式,使用命令提交正常运行,api提交失败 >提交失败,yarn日志: > LogType:jobmanager.err > LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 > LogLength:107 > LogContents: > Error: Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > End of LogType:jobmanager.err >以下是代码: > > > System.setProperty("HADOOP_USER_NAME", "hdfs"); >//flink的本地配置目录,为了得到flink的配置 >String configurationDirectory = >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; > >//存放flink集群相关的jar包目录 >String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >//用户jar >String userJarPath = >"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >String flinkDistJar = > "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; > >YarnClientService yarnClientService = new YarnClientService(); >//yarnclient创建 >YarnClient yarnClient = yarnClientService.getYarnClient(); >yarnClient.start(); > >// 设置日志的,没有的话看不到日志 >YarnClusterInformationRetriever clusterInformationRetriever = >YarnClientYarnClusterInformationRetriever >.create(yarnClient); > >//获取flink的配置 >Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >configurationDirectory); > > > flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); > >flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, > true); > >flinkConfiguration.set(PipelineOptions.JARS, > Collections.singletonList(userJarPath)); > >Path remoteLib = new Path(flinkLibs); >flinkConfiguration.set( >YarnConfigOptions.PROVIDED_LIB_DIRS, >Collections.singletonList(remoteLib.toString())); > >flinkConfiguration.set( >YarnConfigOptions.FLINK_DIST_JAR, >flinkDistJar); > >// 设置为application模式 >flinkConfiguration.set( >DeploymentOptions.TARGET, >YarnDeploymentTarget.APPLICATION.getName()); > >// yarn application name >flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >"flink-application"); > >YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, > configurationDirectory); > >// 设置用户jar的参数和主类 >ApplicationConfiguration appConfig = new ApplicationConfiguration(new >String[]{}, "com.zt.FlinkTest1"); > > >final int jobManagerMemoryMB = > > JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( >flinkConfiguration, > JobManagerOptions.TOTAL_PROCESS_MEMORY) >.getTotalProcessMemorySize() >.getMebiBytes(); >final int taskManagerMemoryMB = >TaskExecutorProcessUtils.processSpecFromConfig( >TaskExecutorProcessUtils > > .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( >flinkConfiguration, > > TaskManagerOptions.TOTAL_PROCESS_MEMORY)) >.getTotalProcessMemorySize() >.getMebiBytes(); >ClusterSpecification clusterSpecification = new > ClusterSpecification.ClusterSpecificationBuilder() >.setMasterMemoryMB(jobManagerMemoryMB) >.setTaskManagerMemoryMB(taskManagerMemoryMB) > > .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)) >.createClusterSpecification(); >YarnClusterDescriptor yarnClusterDescriptor = new > YarnClusterDescriptor( >flinkConfiguration, >(YarnConfiguration) yarnClient.getConfig(), >yar
Re:flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71
图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里; 或者是把异常信息直接贴在邮件内容里 -- Best regards, Mang Zhang 在 2022-04-07 16:25:12,"su wenwen" 写道: hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本 在线上运行的flink sql 作业,总是在凌晨报错如下: blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。
Re:Flink写入Hive错误
在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表; 我理解你现在想要做的是,将flink 表的数据写入到一个hive table里 HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) null, "2.1.1"); tableEnv.registerCatalog("devHive", hiveCatalog); // 去掉这部分,还使用flink默认的catalog //tableEnv.useCatalog("devHive"); //tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); //tableEnv.useDatabase("default"); tableEnv.createTemporaryView("sourceTable", stepFr); // sql 改写一下, ${catalog name}.${db name}.${table name} String sql = "insert into devHive.default.zyz select * from sourceTable"; tableEnv.executeSql(sql); 可以试试这样写 -- Best regards, Mang Zhang At 2022-03-15 14:13:56, "顾斌杰" wrote: > >Flink版本:1.13.3 >Hive版本:2.1.1 > > >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > >DataStream userBehaviorDataStream = source >.map(new > UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull); >userBehaviorDataStream.print(); >SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new > ProcessFunction() { >private static final long serialVersionUID = 6365847542902145255L; > >@Override >public void processElement(UserBehavior value, Context ctx, > Collector out) throws Exception { >Row row = new Row(2); >row.setField(0, value.getAppName()); >row.setField(1, value.getAppName()); >out.collect(row); >} >}); > >HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) > null, "2.1.1"); >tableEnv.registerCatalog("devHive", hiveCatalog); >tableEnv.useCatalog("devHive"); >tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >tableEnv.useDatabase("default"); >tableEnv.createTemporaryView("sourceTable", stepFr); > >String sql = "insert into zyz select * from sourceTable"; >tableEnv.executeSql(sql); > > >但是他老是报错,我想请问是否我写错了什么? >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1991] - >class java.util.LinkedHashMap does not contain a getter for field accessOrder >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1994] - >class java.util.LinkedHashMap does not contain a setter for field accessOrder >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2037] - >Class class java.util.LinkedHashMap cannot be used as a POJO type because not >all fields are valid POJO fields, and must be processed as GenericType. Please >read the Flink documentation on "Data Types & Serialization" for details of >the effect on performance. >2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2093] - >class org.apache.flink.types.Row is missing a default constructor so it cannot >be used as a POJO type and must be processed as GenericType. Please read the >Flink documentation on "Data Types & Serialization" for details of the effect >on performance. >2022-03-15 14:11:39.448 [main] INFO [HiveCatalog][createHiveConf][257] - >Setting hive conf dir as null >2022-03-15 14:11:39.449 [main] INFO [HiveCatalog][createHiveConf][278] - >Found hive-site.xml in classpath: >file:/D:/JetBrains/IdeaProject/paat_realtime_deal/target/classes/hive-site.xml >2022-03-15 14:11:39.491 [main] INFO [HiveCatalog][][219] - Created >HiveCatalog 'devHive' >2022-03-15 14:11:40.063 [main] INFO [HiveCatalog][open][299] - Connected to >Hive metastore >2022-03-15 14:11:40.161 [main] INFO [CatalogManager][setCurrentCatalog][262] >- Set the current default catalog as [devHive] and the current default >database as [default]. >2022-03-15 14:11:41.158 [main] INFO >[HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical >plan >2022-03-15 14:11:41.164 [main] INFO >[HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 >of Semantic Analysis >2022-03-15 14:11:41.164 [main] INFO >[HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source >tables >2022-03-15 14:11:41.178 [main] ERROR >[HiveParserSemanticAnalyzer][getMetaData][1489] - >org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found >'sourceTable' >at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547) >at > org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaDat