[ 
https://issues.apache.org/jira/browse/FLINK-25099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451554#comment-17451554
 ] 

chenqizhu edited comment on FLINK-25099 at 12/1/21, 7:01 AM:
-------------------------------------------------------------

this is my submmision cli example. As you can see, it's very simple

{code:java}
bin/flink run -t yarn-per-job  -Dexecution.checkpointing.interval=60s 
examples/streaming/TopSpeedWindowing.jar
{code}

If nodeManager is using the default yarn config , why can it throw exceptions 
like 'java.net.UnknownHostException: flinkcluster' . 
After all, 'flinkcluster' is client-side configuration.  It seems contradictory
[~zuston]



was (Author: libra_816):
this is my submmision cli example. As you can see, it's very simple

{code:java}
bin/flink run -t yarn-per-job  -Dexecution.checkpointing.interval=60s 
examples/streaming/TopSpeedWindowing.jar
{code}

If nodeManager is using the default yarn config , why can it throw exceptions 
like 'java.net.UnknownHostException: flinkcluster' . 
After all, 'flinkcluster' is client-side configuration.  It seems contradictory



> flink on yarn Accessing two HDFS Clusters
> -----------------------------------------
>
>                 Key: FLINK-25099
>                 URL: https://issues.apache.org/jira/browse/FLINK-25099
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN, FileSystems, Runtime / State Backends
>    Affects Versions: 1.13.3
>         Environment: flink : 1.13.3
> hadoop : 3.3.0
>            Reporter: chenqizhu
>            Priority: Major
>         Attachments: flink-chenqizhu-client-hdfsn21n163.log
>
>
> Flink version 1.13 supports configuration of Hadoop properties in 
> flink-conf.yaml via flink.hadoop.*. There is A requirement to write 
> checkpoint to HDFS with SSDS (called cluster B) to speed checkpoint writing, 
> but this HDFS cluster is not the default HDFS in the flink client (called 
> cluster A by default). Yaml is configured with nameservices for cluster A and 
> cluster B, which is similar to HDFS federated mode.
> The configuration is as follows:
>  
> {code:java}
> flink.hadoop.dfs.nameservices: ACluster,BCluster
> flink.hadoop.fs.defaultFS: hdfs://BCluster
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.xxxx:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.xxxx:50070
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xxxxxx:50070
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xxxxxx:50070
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.xxxxx:50070
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
> {code}
>  
> However, an error occurred during the startup of the job, which is reported 
> as follows:
> (change configuration items to A flink local client default HDFS cluster, the 
> operation can be normal boot:  flink.hadoop.fs.DefaultFS: hdfs: / / ACluster)
> {noformat}
> Caused by: BCluster
> java.net.UnknownHostException: BCluster
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
> at 
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:374)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:308)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745){noformat}
> Is there a solution to the above problems? The pain point is that Flink can 
> access two HDFS clusters, preferably through the configuration of Flink-conf. 
> yaml.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to