答复: Re:Re:yarn api 提交报错
Hi 周涛, Mang的建议很好,封装拼接参数是比较常见的实现。如果你没有什么特殊需求的话,推荐考虑他的建议,之后Flink版本升级之类的也一般会方便一些。 如果你因为某些原因,要继续走你目前的方式的话,我看到你的代码和YARNApplicationITCase中的代码比较接近了,你可以注意下,ITCase中的代码是在本地运行的,并且通过类似YARNApplicationITCase#startYARNWithConfig这样的方法设置好了HADOOP和Flink相关的环境变量。 在实际作业中,最终在Server侧,YARN在AM运行Flink作业的命令类似这样: /bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b … 我觉得你可以结合startYARNWithConfig判断一下你是否有哪里没有设置好相应环境变量,导致AM中的classpath等环境变量不对,进而导致找不到class。 发件人: 周涛 <06160...@163.com> 日期: 星期五, 2022年4月8日 下午8:57 收件人: Mang Zhang 抄送: user-zh@flink.apache.org 主题: Re:Re:yarn api 提交报错 非常感谢Mang Zhang的回复 AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类: Log Type: jobmanager.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 107 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint Log Type: jobmanager.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 70 Setting up env variables Setting up job resources Launching container 我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar 提交到yarn后,启动jm时抛错。一直未找到原因 另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务 在 2022-04-08 17:57:48,"Mang Zhang" 写道: 这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log 比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN 比如flink lib下的jar,使用 YarnLocalResourceDescriptor 注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; 建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 祝好 -- Best regards, Mang Zhang 在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: > > > > > > >hi, >我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >flink版本1.14.4 >Hadoop版本:3.0.0-cdh6.2.1 >application模式,使用命令提交正常运行,api提交失败 >提交失败,yarn日志: > LogType:jobmanager.err > LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 > LogLength:107 > LogContents: > Error: Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > End of LogType:jobmanager.err >以下是代码: > > > System.setProperty("HADOOP_USER_NAME", "hdfs"); >//flink的本地配置目录,为了得到flink的配置 >String configurationDirectory = >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; > >//存放flink集群相关的jar包目录 >String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >//用户jar >String userJarPath = >"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >String flinkDistJar = > "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; > >YarnClientService yarnClientService = new YarnClientService(); >//yarnclient创建 >YarnClient yarnClient = yarnClientService.getYarnClient(); >yarnClient.start(); > >// 设置日志的,没有的话看不到日志 >YarnClusterInformationRetriever clusterInformationRetriever = >YarnClientYarnClusterInformationRetriever >.create(yarnClient); > >//获取flink的配置 >Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >configurationDirectory); > > > flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); > >flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, > true); > >flinkConfiguration.set(PipelineOptions.JARS, > Collections.singletonList(userJarPath)); > >Path remoteLib = new Path(flinkLibs); >flinkConfiguration.set( >YarnConfigOptions.PROVIDED_LIB_DIRS, >Collections.singletonList(remoteLib.toString())); > >flinkConfiguration.set( >YarnConfigOptions.FLINK_DIST_JAR, >flinkDistJar); > >// 设置为application模式 >flinkConfiguration.set( >DeploymentOptions.TARGET, >YarnDeploymentTarget.APPLICATION.getName()); > >// yarn application name >flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >"flink-application"); > >YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, > configurationDirectory); > >// 设置用户jar的参数和主类 >ApplicationConfiguration appConfig = new ApplicationConfiguration(new >String[]{}, "com.zt.FlinkTest1"); > > >final int jobManagerMemoryMB = > > JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( >flinkConfiguration, > JobManagerOptions.TOTAL_PROCESS_MEMORY) >.getTotalProcessMemorySize() >
Re:Re:Re:yarn api 提交报错
这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app 这个项目理解清楚YARN APP的机制和原理; 回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 相关命令来提交任务 你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点; 另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html 希望对你有所帮助 -- Best regards, Mang Zhang 在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道: >非常感谢Mang Zhang的回复 >AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类: > >Log Type: jobmanager.err > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 107 > >Error: Could not find or load main class >org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > > >Log Type: jobmanager.out > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 0 > >Log Type: prelaunch.err > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 0 > >Log Type: prelaunch.out > >Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 > >Log Length: 70 > >Setting up env variables >Setting up job resources >Launching container > > >我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar >提交到yarn后,启动jm时抛错。一直未找到原因 > > > > >另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务 > > > > > > > > > > > > > > >在 2022-04-08 17:57:48,"Mang Zhang" 写道: > > > > >这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log >比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 >另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN >比如flink lib下的jar,使用 YarnLocalResourceDescriptor >注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; > > >建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 > > > > > > > >祝好 > > > > > > > >-- > >Best regards, >Mang Zhang > > > > > >在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: >> >> >> >> >> >> >>hi, >>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >>flink版本1.14.4 >>Hadoop版本:3.0.0-cdh6.2.1 >>application模式,使用命令提交正常运行,api提交失败 >>提交失败,yarn日志: >> LogType:jobmanager.err >> LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 >> LogLength:107 >> LogContents: >> Error: Could not find or load main class >> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint >> End of LogType:jobmanager.err >>以下是代码: >> >> >> System.setProperty("HADOOP_USER_NAME", "hdfs"); >>//flink的本地配置目录,为了得到flink的配置 >>String configurationDirectory = >>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; >> >>//存放flink集群相关的jar包目录 >>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >>//用户jar >>String userJarPath = >>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >>String flinkDistJar = >> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; >> >>YarnClientService yarnClientService = new YarnClientService(); >>//yarnclient创建 >>YarnClient yarnClient = yarnClientService.getYarnClient(); >>yarnClient.start(); >> >>// 设置日志的,没有的话看不到日志 >>YarnClusterInformationRetriever clusterInformationRetriever = >>YarnClientYarnClusterInformationRetriever >>.create(yarnClient); >> >>//获取flink的配置 >>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >>configurationDirectory); >> >> >> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); >> >>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, >> true); >> >>flinkConfiguration.set(PipelineOptions.JARS, >> Collections.singletonList(userJarPath)); >> >>Path remoteLib = new Path(flinkLibs); >>flinkConfiguration.set( >>YarnConfigOptions.PROVIDED_LIB_DIRS, >>Collections.singletonList(remoteLib.toString())); >> >>flinkConfiguration.set( >>YarnConfigOptions.FLINK_DIST_JAR, >>flinkDistJar); >> >>// 设置为application模式 >>flinkConfiguration.set( >>DeploymentOptions.TARGET, >>YarnDeploymentTarget.APPLICATION.getName()); >> >>// yarn application name >>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >>"flink-application"); >> >>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, >> configurationDirectory); >> >>// 设置用户jar的参数和主类 >>ApplicationConfiguration appConfig = new ApplicationConfiguration(new >>String[]{}, "com.zt.FlinkTest1"); >> >> >>final int jobManagerMemoryMB = >> >> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( >>flinkConfiguration, >> JobManagerOptions.TOTAL_PROCESS_MEMORY) >>.getTotalProcessMemorySize() >>.getMebiBytes(); >>final
退订
Re:Re:yarn api 提交报错
非常感谢Mang Zhang的回复 AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类: Log Type: jobmanager.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 107 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint Log Type: jobmanager.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 70 Setting up env variables Setting up job resources Launching container 我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar 提交到yarn后,启动jm时抛错。一直未找到原因 另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务 在 2022-04-08 17:57:48,"Mang Zhang" 写道: 这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log 比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN 比如flink lib下的jar,使用 YarnLocalResourceDescriptor 注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; 建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 祝好 -- Best regards, Mang Zhang 在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: > > > > > > >hi, >我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >flink版本1.14.4 >Hadoop版本:3.0.0-cdh6.2.1 >application模式,使用命令提交正常运行,api提交失败 >提交失败,yarn日志: > LogType:jobmanager.err > LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 > LogLength:107 > LogContents: > Error: Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > End of LogType:jobmanager.err >以下是代码: > > > System.setProperty("HADOOP_USER_NAME", "hdfs"); >//flink的本地配置目录,为了得到flink的配置 >String configurationDirectory = >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; > >//存放flink集群相关的jar包目录 >String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >//用户jar >String userJarPath = >"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >String flinkDistJar = > "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; > >YarnClientService yarnClientService = new YarnClientService(); >//yarnclient创建 >YarnClient yarnClient = yarnClientService.getYarnClient(); >yarnClient.start(); > >// 设置日志的,没有的话看不到日志 >YarnClusterInformationRetriever clusterInformationRetriever = >YarnClientYarnClusterInformationRetriever >.create(yarnClient); > >//获取flink的配置 >Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >configurationDirectory); > > > flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); > >flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, > true); > >flinkConfiguration.set(PipelineOptions.JARS, > Collections.singletonList(userJarPath)); > >Path remoteLib = new Path(flinkLibs); >flinkConfiguration.set( >YarnConfigOptions.PROVIDED_LIB_DIRS, >Collections.singletonList(remoteLib.toString())); > >flinkConfiguration.set( >YarnConfigOptions.FLINK_DIST_JAR, >flinkDistJar); > >// 设置为application模式 >flinkConfiguration.set( >DeploymentOptions.TARGET, >YarnDeploymentTarget.APPLICATION.getName()); > >// yarn application name >flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >"flink-application"); > >YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, > configurationDirectory); > >// 设置用户jar的参数和主类 >ApplicationConfiguration appConfig = new ApplicationConfiguration(new >String[]{}, "com.zt.FlinkTest1"); > > >final int jobManagerMemoryMB = > > JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( >flinkConfiguration, > JobManagerOptions.TOTAL_PROCESS_MEMORY) >.getTotalProcessMemorySize() >.getMebiBytes(); >final int taskManagerMemoryMB = >TaskExecutorProcessUtils.processSpecFromConfig( >TaskExecutorProcessUtils > > .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( >flinkConfiguration, > > TaskManagerOptions.TOTAL_PROCESS_MEMORY)) >.getTotalProcessMemorySize() >.getMebiBytes(); >ClusterSpecification clusterSpecification = new > ClusterSpecification.ClusterSpecificationBuilder() >.setMasterMemoryMB(jobManagerMemoryMB) >
Re:yarn api 提交报错
这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log 比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN 比如flink lib下的jar,使用 YarnLocalResourceDescriptor 注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; 建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 -- Best regards, Mang Zhang 在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: > > > > > > >hi, >我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: >flink版本1.14.4 >Hadoop版本:3.0.0-cdh6.2.1 >application模式,使用命令提交正常运行,api提交失败 >提交失败,yarn日志: > LogType:jobmanager.err > LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 > LogLength:107 > LogContents: > Error: Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > End of LogType:jobmanager.err >以下是代码: > > > System.setProperty("HADOOP_USER_NAME", "hdfs"); >//flink的本地配置目录,为了得到flink的配置 >String configurationDirectory = >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; > >//存放flink集群相关的jar包目录 >String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >//用户jar >String userJarPath = >"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; >String flinkDistJar = > "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; > >YarnClientService yarnClientService = new YarnClientService(); >//yarnclient创建 >YarnClient yarnClient = yarnClientService.getYarnClient(); >yarnClient.start(); > >// 设置日志的,没有的话看不到日志 >YarnClusterInformationRetriever clusterInformationRetriever = >YarnClientYarnClusterInformationRetriever >.create(yarnClient); > >//获取flink的配置 >Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( >configurationDirectory); > > > flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); > >flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, > true); > >flinkConfiguration.set(PipelineOptions.JARS, > Collections.singletonList(userJarPath)); > >Path remoteLib = new Path(flinkLibs); >flinkConfiguration.set( >YarnConfigOptions.PROVIDED_LIB_DIRS, >Collections.singletonList(remoteLib.toString())); > >flinkConfiguration.set( >YarnConfigOptions.FLINK_DIST_JAR, >flinkDistJar); > >// 设置为application模式 >flinkConfiguration.set( >DeploymentOptions.TARGET, >YarnDeploymentTarget.APPLICATION.getName()); > >// yarn application name >flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >"flink-application"); > >YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, > configurationDirectory); > >// 设置用户jar的参数和主类 >ApplicationConfiguration appConfig = new ApplicationConfiguration(new >String[]{}, "com.zt.FlinkTest1"); > > >final int jobManagerMemoryMB = > > JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( >flinkConfiguration, > JobManagerOptions.TOTAL_PROCESS_MEMORY) >.getTotalProcessMemorySize() >.getMebiBytes(); >final int taskManagerMemoryMB = >TaskExecutorProcessUtils.processSpecFromConfig( >TaskExecutorProcessUtils > > .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( >flinkConfiguration, > > TaskManagerOptions.TOTAL_PROCESS_MEMORY)) >.getTotalProcessMemorySize() >.getMebiBytes(); >ClusterSpecification clusterSpecification = new > ClusterSpecification.ClusterSpecificationBuilder() >.setMasterMemoryMB(jobManagerMemoryMB) >.setTaskManagerMemoryMB(taskManagerMemoryMB) > > .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)) >.createClusterSpecification(); >YarnClusterDescriptor yarnClusterDescriptor = new > YarnClusterDescriptor( >flinkConfiguration, >(YarnConfiguration) yarnClient.getConfig(), >yarnClient, >clusterInformationRetriever, >true); > >try { >ClusterClientProvider clusterClientProvider = > yarnClusterDescriptor.deployApplicationCluster( >clusterSpecification, >appConfig); > >ClusterClient clusterClient = > clusterClientProvider.getClusterClient(); > >ApplicationId applicationId = clusterClient.getClusterId(); >String webInterfaceURL =
Re:flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71
图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里; 或者是把异常信息直接贴在邮件内容里 -- Best regards, Mang Zhang 在 2022-04-07 16:25:12,"su wenwen" 写道: hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本 在线上运行的flink sql 作业,总是在凌晨报错如下: blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。
Re: k8s session cluster flink1.13.6创建后提示的地址啥用。
Standalone K8s 和 Native K8s 部署模式主要的区别是 Native K8s 模式下的 Flink 具备和 K8s API Server 直接通信来申请所需的资源和感知集群状态的能力,而 Standalone K8s 对底层的 K8s 集群没有直接感知,这带来了两个主要区别: 1. 在部署上,Standalone K8s 需要你手动去创建集群所需要的 deployment、configmap、service,而 Native K8s 你只需要调用 Flink CLI 就行。 2. 在资源申请上,Standalone K8s 使用被动资源管理 - 需要你或者其他外部系统分配好资源,Flink 被动接受这些分配好的资源;Native K8s 使用主动资源管理 - Flink 集群启动后自己会根据提交上来的作业的属性去跟 K8s 申请所需要的资源。 Best, Zhanghao Chen From: yidan zhao Sent: Friday, April 8, 2022 10:52 To: user-zh Subject: Re: k8s session cluster flink1.13.6创建后提示的地址啥用。 貌似官网对flink k8s情况有2个入口,分别为: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode 和 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。 分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource Providers/Native Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native kubernetes”这种区分呢? > > 集群是3台物理机搭建,非minikube。 > 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。 > 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。 > > yidan zhao 于2022年4月8日周五 10:38写道: > > > > 如下是 describe svc my-first-flink-cluster-rest 的结果: > > Name: my-first-flink-cluster-rest > > Namespace:default > > Labels: app=my-first-flink-cluster > > type=flink-native-kubernetes > > Annotations: > > Selector: > > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes > > Type: LoadBalancer > > IP Family Policy: SingleStack > > IP Families: IPv4 > > IP: 192.168.127.57 > > IPs: 192.168.127.57 > > Port: rest 8081/TCP > > TargetPort: 8081/TCP > > NodePort: rest 31419/TCP > > Endpoints:192.168.130.11:8081 > > Session Affinity: None > > External Traffic Policy: Cluster > > Events: > > > > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过 > > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。 > > > > yu'an huang 于2022年4月8日周五 02:11写道: > > > > > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗? > > > > > > 方便的话可以分享你运行这个命令的结果吗? > > > 》kubectl describe svc my-first-flink-cluster-rest > > > > > > > > > > > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen > > > > wrote: > > > > > > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢? > > > > > > > > Best, > > > > Zhanghao Chen > > > > > > > > From: yidan zhao > > > > Sent: Thursday, April 7, 2022 11:41 > > > > To: user-zh > > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。 > > > > > > > > 参考 > > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes > > > > > > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh > > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create > > > > flink session cluster my-first-flink-cluster successfully, JobManager > > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。 > > > > > > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。 > > > > ./bin/flink run \ > > > >--target kubernetes-session \ > > > >-Dkubernetes.cluster-id=my-first-flink-cluster \ > > > >./examples/streaming/TopSpeedWindowing.jar > > > > > > > > --- 然后如下方式是可以的,不清楚是啥问题呢。 > > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest > > > > 的clusterIp:port为192.168.127.57:8081。 > > > > 2 查看任务 > > > > flink list -m 192.168.127.57:8081 > > > > 3 提交任务 > > > > flink run -m 192.168.127.57:8081 > > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar > > > > > > > > --- 区别:192这个是clusterIp虚拟ip。 10.x那个是我机器ip。 > > >