如果是两套oss或s3 
bucket(每个bucket对应一组accessKey/secret)要怎么配置呢?例如写数据到bucketA,但checkpoint在bucketB

















在 2021-12-06 18:59:46,"Yang Wang" <danrtsey...@gmail.com> 写道:
>我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式
>
>-yt /path/of/my-hadoop-conf
>-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>
>
>Best,
>Yang
>
>chenqizhu <qizhu...@163.com> 于2021年11月30日周二 上午10:00写道:
>
>> all,您好:
>>
>>      flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
>> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>>
>>
>>
>>
>> flink.hadoop.dfs.nameservices: ACluster,BCluster
>>
>> flink.hadoop.fs.defaultFS: hdfs://BCluster
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.xxxx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.xxxx:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xxxxxx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xxxxxx:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xxxxxx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xxxxxx:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xxxxxx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.xxxxx:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>>
>> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
>> hdfs://ACluster)
>>
>>
>>
>>
>> Caused by: BCluster
>>
>> java.net.UnknownHostException: BCluster
>>
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>>
>> at
>> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>>
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:374)
>>
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:308)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>>
>> at
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>>
>>
>>
>>
>> 我的组件版本:
>>
>> flink : 1.13.3
>>
>> hadoop : 3.3.0
>>
>>
>>
>>
>> 期待回复,感谢!

回复