hi,
我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
flink版本1.14.4
Hadoop版本:3.0.0-cdh6.2.1
application模式,使用命令提交正常运行,api提交失败
提交失败,yarn日志:
LogType:jobmanager.err
LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
LogLength:107
LogContents:
Error: Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
End of LogType:jobmanager.err
以下是代码:
System.setProperty("HADOOP_USER_NAME", "hdfs");
//flink的本地配置目录,为了得到flink的配置
String configurationDirectory =
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
//用户jar
String userJarPath =
"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
String flinkDistJar =
"hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
YarnClientService yarnClientService = new YarnClientService();
//yarnclient创建
YarnClient yarnClient = yarnClientService.getYarnClient();
yarnClient.start();
// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever =
YarnClientYarnClusterInformationRetriever
.create(yarnClient);
//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS,
true);
flinkConfiguration.set(PipelineOptions.JARS,
Collections.singletonList(userJarPath));
Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString()));
flinkConfiguration.set(
YarnConfigOptions.FLINK_DIST_JAR,
flinkDistJar);
// 设置为application模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,
configurationDirectory);
// 设置用户jar的参数和主类
ApplicationConfiguration appConfig = new ApplicationConfiguration(new
String[]{}, "com.zt.FlinkTest1");
final int jobManagerMemoryMB =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils
.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
flinkConfiguration,
TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
ClusterSpecification clusterSpecification = new
ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
.createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
(YarnConfiguration) yarnClient.getConfig(),
yarnClient,
clusterInformationRetriever,
true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider =
yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
ClusterClient<ApplicationId> clusterClient =
clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
log.error("applicationId is {}", applicationId);
log.error("webInterfaceURL is {}", webInterfaceURL);
// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// yarnClient.close();
}
以下是提交的部分日志:
09:24:01.288 [IPC Parameter Sending Thread #0] DEBUG
org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to
bigdata-beta2/192.168.15.185:8032 from hdfs sending #31
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport
09:24:01.305 [IPC Client (1948810915) connection to
bigdata-beta2/192.168.15.185:8032 from hdfs] DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1948810915) connection to bigdata-beta2/192.168.15.185:8032 from
hdfs got value #31
09:24:01.305 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call:
getApplicationReport took 18ms
09:24:01.305 [main] DEBUG org.apache.flink.yarn.YarnClusterDescriptor -
Application State: FAILED
09:24:01.321 [main] ERROR com.yaduo.flink.DeployTest - Couldn't deploy Yarn
Application Cluster
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:466)
at com.yaduo.flink.DeployTest.main(DeployTest.java:122)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1647875542261_0079 failed 2
times due to AM Container for appattempt_1647875542261_0079_000002 exited with
exitCode: 1
Failing this attempt.Diagnostics: [2022-04-08 09:24:00.480]Exception from
container-launch.
Container id: container_1647875542261_0079_02_000002
Exit code: 1
[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
For more detailed output, check the application tracking page:
http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 Then click
on links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1647875542261_0079
at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1219)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:459)
... 1 common frames omitted
09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
Cancelling deployment from Deployment Failure Hook
09:24:01.321 [Thread-9] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting
to ResourceManager at bigdata-beta2/192.168.15.185:8032
09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.security.UserGroupInformation -
PrivilegedAction as:hdfs (auth:SIMPLE)
from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:147)
09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating
YarnRPC for null
09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC -
Creating a HadoopYarnProtoRpc proxy for protocol interface
org.apache.hadoop.yarn.api.ApplicationClientProtocol
09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.ipc.Client - getting client out
of cache: org.apache.hadoop.ipc.Client@6692b6c6
09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.service.AbstractService -
Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
Killing YARN application
09:24:01.321 [Thread-4] DEBUG org.apache.hadoop.util.ShutdownHookManager -
ShutdownHookManger complete shutdown.
以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。
期待回复!
以上,祝好