Hi Wei,

Did you build Flink with maven 3.2.5 as recommended in the documentation
[1]?
Also, did you use the -Pvendor-repos flag to add the cloudera repository
when
building?

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#vendor-specific-versions

On Tue, Jan 8, 2019 at 5:17 AM Wei Sun <rdn...@w3sun.com> wrote:

> Hi,Timo
>
> Good day!
>
> Thank you for your help! This issue has been solved with the rebuilt flink
> version.  But I found that does not work with the
> 'Apache Flink 1.7.1 only' version even if i configure the class path like 
> export
> HADOOP_CLASSPATH=`hadoop classpath` . I will check it later.
> Thanks again.
>
> Best Regards
> Wei
>
> ------------------ Original ------------------
> *From: * "Timo Walther";<twal...@apache.org>;
> *Date: * Jan 8, 2019
> *To: * "user"<user@flink.apache.org>;
> *Cc: * "gary"<g...@apache.org>;
> *Subject: * Re: Building Flink from source according to vendor-specific
> versionbut causes protobuf conflict
>
> Hi Wei,
>
> did you play around with classloading options mentioned here [1]. The -d
> option might impact how classes are loaded when the job is deployed on the
> cluster.
>
> I will loop in Gary that might now more about the YARN behavior.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath
>
>
> Am 07.01.19 um 10:33 schrieb Wei Sun:
>
> Hi guys,
>
> Good day.
>
> I rebuilt flink from the source and specified the vendor specific Hadoop
> version. It works well when i just submit a streaming application  without
> '-d'(--detached) option as follows:
> bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm
> 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
> ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
>
> But if i add the '-d'(--detached) option,  a '
> *org.apache.flink.client.deployment.ClusterDeploymentException*' will be
> thrown out to the CLI. Just as:
> bin/flink run *-d* -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048
> -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
> ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
>
> *--------------------------------Exception
> start--------------------------------------------------------------------------------------------------------------------------------------------*
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
> Diagnostics from YARN: Application application_1544777537685_0068 failed 2
> times due to AM Container for appattempt_1544777537685_0068_000002 exited
> with  exitCode: 1
> For more detailed output, check application tracking page:
> http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e03_1544777537685_0068_02_000001
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
> at org.apache.hadoop.util.Shell.run(Shell.java:460)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Container exited with a non-zero exit code 1
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1544777537685_0068
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
> ... 9 more
> 2019-01-07 17:08:55,463 INFO
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cancelling
> deployment from Deployment Failure Hook
> 2019-01-07 17:08:55,464 INFO
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing
> YARN application
> 2019-01-07 17:08:55,471 INFO
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deleting
> files in
>
> *-------------------------------------------End----------------------End----------------------------End-------------------------------------------------*
>
> My cluster has enable the log aggregation, so I executed the following
> command:
> yarn logs -applicationId application_1544777537685_0068, the detail about
> the log shows that
> *---------------------------------------------Exception
> start----------------------Exception
> start-----------------------------------------------------------------------------*
> The detail information show that:
> 2019-01-07 17:08:49,385 INFO  akka.remote.Remoting
>                  - Remoting started; listening on addresses :[
> akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998]
> 2019-01-07 17:08:49,391 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor
> system started at akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998
> 2019-01-07 17:08:51,108 INFO
>  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating
> highly available BLOB storage directory at
> hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
> 2019-01-07 17:08:51,138 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting
> YarnJobClusterEntrypoint down with application status FAILED. Diagnostics
> java.lang.ClassCastException:
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto
> cannot be cast to com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
> at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
> at
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
> .
> 2019-01-07 17:08:51,142 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping
> Akka RPC service.
> 2019-01-07 17:08:51,149 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
> down remote daemon.
> 2019-01-07 17:08:51,151 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2019-01-07 17:08:51,170 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.
> 2019-01-07 17:08:51,196 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped
> Akka RPC service.
> 2019-01-07 17:08:51,196 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not
> start cluster entrypoint YarnJobClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto
> cannot be cast to com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
> at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
> at
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
> ... 2 more
>
>
> *---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------*
>
> When I rebuild the flink from source, I used the command:
> mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0
>
> My cluster details:
> Hadoop 2.6.0-cdh5.5.0
> Subversion http://github.com/cloudera/hadoop -r
> fd21232cef7b8c1f536965897ce20f50b83ee7b2
> Compiled by jenkins on 2015-11-09T20:39Z
> Compiled with protoc 2.5.0
> From source with checksum 98e07176d1787150a6a9c087627562c
> This command was run using
> /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar
>
> I am wondering that I have specified the vendor specific Hadoop version,
> how did this happened? Or is there anything or extra parameters should be
> added?
>
> It will be predicated if anybody could help me to figure it out.
>
>
> Thank you
> Best Regards
>
>
>

Reply via email to