答复: Re:Re:yarn api 提交报错

2022-04-08 文章 Geng Biao
Hi 周涛,
Mang的建议很好,封装拼接参数是比较常见的实现。如果你没有什么特殊需求的话,推荐考虑他的建议,之后Flink版本升级之类的也一般会方便一些。
如果你因为某些原因,要继续走你目前的方式的话,我看到你的代码和YARNApplicationITCase中的代码比较接近了,你可以注意下,ITCase中的代码是在本地运行的,并且通过类似YARNApplicationITCase#startYARNWithConfig这样的方法设置好了HADOOP和Flink相关的环境变量。
在实际作业中,最终在Server侧,YARN在AM运行Flink作业的命令类似这样:
/bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xmx1073741824 -Xms1073741824 
-XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D 
jobmanager.memory.off-heap.size=134217728b …
我觉得你可以结合startYARNWithConfig判断一下你是否有哪里没有设置好相应环境变量,导致AM中的classpath等环境变量不对,进而导致找不到class。

发件人: 周涛 <06160...@163.com>
日期: 星期五, 2022年4月8日 下午8:57
收件人: Mang Zhang 
抄送: user-zh@flink.apache.org 
主题: Re:Re:yarn api 提交报错
非常感谢Mang Zhang的回复
AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:

Log Type: jobmanager.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 107

Error: Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint


Log Type: jobmanager.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 70

Setting up env variables
Setting up job resources
Launching container


我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
提交到yarn后,启动jm时抛错。一直未找到原因




另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务














在 2022-04-08 17:57:48,"Mang Zhang"  写道:




这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了







祝好







--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>  

Re:Re:Re:yarn api 提交报错

2022-04-08 文章 Mang Zhang



这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app
这个项目理解清楚YARN APP的机制和原理;


回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 
相关命令来提交任务
你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点;


另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考
https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html




希望对你有所帮助




--

Best regards,
Mang Zhang





在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道:
>非常感谢Mang Zhang的回复
>AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:
>
>Log Type: jobmanager.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 107
>
>Error: Could not find or load main class 
>org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>
>
>Log Type: jobmanager.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 70
>
>Setting up env variables
>Setting up job resources
>Launching container
>
>
>我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
>提交到yarn后,启动jm时抛错。一直未找到原因
>
>
>
>
>另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-04-08 17:57:48,"Mang Zhang"  写道:
>
>
>
>
>这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
>比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
>另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
>比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
>注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;
>
>
>建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了
>
>
>
>
>
>
>
>祝好
>
>
>
>
>
>
>
>--
>
>Best regards,
>Mang Zhang
>
>
>
>
>
>在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>>
>>
>>
>>
>>
>>
>>hi,
>>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>>flink版本1.14.4
>>Hadoop版本:3.0.0-cdh6.2.1
>>application模式,使用命令提交正常运行,api提交失败
>>提交失败,yarn日志:
>>   LogType:jobmanager.err
>>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>>   LogLength:107
>>   LogContents:
>>   Error: Could not find or load main class 
>> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>>   End of LogType:jobmanager.err
>>以下是代码:
>>
>>
>> System.setProperty("HADOOP_USER_NAME", "hdfs");
>>//flink的本地配置目录,为了得到flink的配置
>>String configurationDirectory = 
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>>
>>//存放flink集群相关的jar包目录
>>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>>//用户jar
>>String userJarPath = 
>>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>>String flinkDistJar = 
>> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>>
>>YarnClientService yarnClientService = new YarnClientService();
>>//yarnclient创建
>>YarnClient yarnClient = yarnClientService.getYarnClient();
>>yarnClient.start();
>>
>>// 设置日志的,没有的话看不到日志
>>YarnClusterInformationRetriever clusterInformationRetriever = 
>>YarnClientYarnClusterInformationRetriever
>>.create(yarnClient);
>>
>>//获取flink的配置
>>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>>configurationDirectory);
>>
>>
>> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>>
>>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
>> true);
>>
>>flinkConfiguration.set(PipelineOptions.JARS, 
>> Collections.singletonList(userJarPath));
>>
>>Path remoteLib = new Path(flinkLibs);
>>flinkConfiguration.set(
>>YarnConfigOptions.PROVIDED_LIB_DIRS,
>>Collections.singletonList(remoteLib.toString()));
>>
>>flinkConfiguration.set(
>>YarnConfigOptions.FLINK_DIST_JAR,
>>flinkDistJar);
>>
>>// 设置为application模式
>>flinkConfiguration.set(
>>DeploymentOptions.TARGET,
>>YarnDeploymentTarget.APPLICATION.getName());
>>
>>// yarn application name
>>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>>"flink-application");
>>
>>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
>> configurationDirectory);
>>
>>// 设置用户jar的参数和主类
>>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>>String[]{}, "com.zt.FlinkTest1");
>>
>>
>>final int jobManagerMemoryMB =
>>
>> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>>flinkConfiguration, 
>> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>>.getTotalProcessMemorySize()
>>.getMebiBytes();
>>final 

退订

2022-04-08 文章 swessia



Re:Re:yarn api 提交报错

2022-04-08 文章 周涛
非常感谢Mang Zhang的回复
AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:

Log Type: jobmanager.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 107

Error: Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint


Log Type: jobmanager.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 70

Setting up env variables
Setting up job resources
Launching container


我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
提交到yarn后,启动jm时抛错。一直未找到原因




另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务














在 2022-04-08 17:57:48,"Mang Zhang"  写道:




这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了







祝好







--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>.getMebiBytes();
>final int taskManagerMemoryMB =
>TaskExecutorProcessUtils.processSpecFromConfig(
>TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>.getTotalProcessMemorySize()
>.getMebiBytes();
>ClusterSpecification clusterSpecification = new 
> ClusterSpecification.ClusterSpecificationBuilder()
>.setMasterMemoryMB(jobManagerMemoryMB)
>

Re:yarn api 提交报错

2022-04-08 文章 Mang Zhang



这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了










--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>.getMebiBytes();
>final int taskManagerMemoryMB =
>TaskExecutorProcessUtils.processSpecFromConfig(
>TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>.getTotalProcessMemorySize()
>.getMebiBytes();
>ClusterSpecification clusterSpecification = new 
> ClusterSpecification.ClusterSpecificationBuilder()
>.setMasterMemoryMB(jobManagerMemoryMB)
>.setTaskManagerMemoryMB(taskManagerMemoryMB)
>
> .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
>.createClusterSpecification();
>YarnClusterDescriptor yarnClusterDescriptor = new 
> YarnClusterDescriptor(
>flinkConfiguration,
>(YarnConfiguration) yarnClient.getConfig(),
>yarnClient,
>clusterInformationRetriever,
>true);
>
>try {
>ClusterClientProvider clusterClientProvider = 
> yarnClusterDescriptor.deployApplicationCluster(
>clusterSpecification,
>appConfig);
>
>ClusterClient clusterClient = 
> clusterClientProvider.getClusterClient();
>
>ApplicationId applicationId = clusterClient.getClusterId();
>String webInterfaceURL = 

Re:flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-08 文章 Mang Zhang



图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里;
或者是把异常信息直接贴在邮件内容里










--

Best regards,
Mang Zhang




在 2022-04-07 16:25:12,"su wenwen"  写道:

hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:




blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。

Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-08 文章 Zhanghao Chen
Standalone K8s 和 Native K8s 部署模式主要的区别是 Native K8s 模式下的 Flink 具备和 K8s API Server 
直接通信来申请所需的资源和感知集群状态的能力,而 Standalone K8s 对底层的 K8s 集群没有直接感知,这带来了两个主要区别:


  1.  在部署上,Standalone K8s 需要你手动去创建集群所需要的 deployment、configmap、service,而 Native 
K8s 你只需要调用 Flink CLI 就行。
  2.  在资源申请上,Standalone K8s 使用被动资源管理 - 需要你或者其他外部系统分配好资源,Flink 
被动接受这些分配好的资源;Native K8s 使用主动资源管理 - Flink 集群启动后自己会根据提交上来的作业的属性去跟 K8s 申请所需要的资源。

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Friday, April 8, 2022 10:52
To: user-zh 
Subject: Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

貌似官网对flink k8s情况有2个入口,分别为:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode
和
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。

分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource
Providers/Native
Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native
kubernetes”这种区分呢?

>
> 集群是3台物理机搭建,非minikube。
> 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
> 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。
>
> yidan zhao  于2022年4月8日周五 10:38写道:
> >
> > 如下是 describe svc my-first-flink-cluster-rest 的结果:
> > Name: my-first-flink-cluster-rest
> > Namespace:default
> > Labels:   app=my-first-flink-cluster
> >   type=flink-native-kubernetes
> > Annotations:  
> > Selector:
> > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> > Type: LoadBalancer
> > IP Family Policy: SingleStack
> > IP Families:  IPv4
> > IP:   192.168.127.57
> > IPs:  192.168.127.57
> > Port: rest  8081/TCP
> > TargetPort:   8081/TCP
> > NodePort: rest  31419/TCP
> > Endpoints:192.168.130.11:8081
> > Session Affinity: None
> > External Traffic Policy:  Cluster
> > Events:   
> >
> > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
> >
> > yu'an huang  于2022年4月8日周五 02:11写道:
> > >
> > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> > >
> > > 方便的话可以分享你运行这个命令的结果吗?
> > > 》kubectl describe svc  my-first-flink-cluster-rest
> > >
> > >
> > >
> > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > > wrote:
> > > >
> > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: yidan zhao 
> > > > Sent: Thursday, April 7, 2022 11:41
> > > > To: user-zh 
> > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > > >
> > > > 参考 
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > > >
> > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > > ./bin/flink run \
> > > >--target kubernetes-session \
> > > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > > >./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > > 的clusterIp:port为192.168.127.57:8081。
> > > > 2 查看任务
> > > > flink list  -m 192.168.127.57:8081
> > > > 3 提交任务
> > > > flink run  -m 192.168.127.57:8081
> > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> > >