Thanks for figuring this out, Shuyi!
> On 9. Oct 2018, at 09:09, Shuyi Chen <suez1...@gmail.com> wrote:
>
> I think the bug is introduced in FLINK-7643 (Rework FileSystem loading to use
> factories). In YarnApplicationMasterRunner, after the JIRA, FileSystem was
> not properly initialized with the correct flink configuration before calling
> runApplicationMaster(). W/o the initialization, a call of
> FileSystem.get("hdfs://nameservice0/") will cause the HadoopFsFactory to be
> initialized with an empty Flink Configuration, thus not able to recognize
> nameservice0.
>
> I've composed unittests in YarnApplicationMasterRunner,
> YarnTaskManagerRunnerFactory and YarnTaskExecutorRunner, and verify that
> YarnApplicationMasterRunner fail to initialize the FileSystem properly, and
> YarnTaskManagerRunnerFactory does the right thing, so does
> YarnTaskExecutorRunner.
>
> I'll create a JIRA to track the fix.
>
>
> On Thu, Oct 4, 2018 at 11:23 AM Yan Yan <yanyan300...@gmail.com
> <mailto:yanyan300...@gmail.com>> wrote:
> Hi Aljoscha,
>
> Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2
> and it works. So seems truly a feature disparity between 1.3.2 and 1.4.
>
> Best,
> Yan
>
>
> On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
> Another thing: when you retry this again with Flink 1.3.2 it works? I'm
> trying to rule out another problem in the setup.
>
>
>> On 4. Oct 2018, at 15:17, Aljoscha Krettek <aljos...@apache.org
>> <mailto:aljos...@apache.org>> wrote:
>>
>> Hi Yan,
>>
>> This seems to be a bug in the FileSystems and how they're initialized. I'm
>> looking into this myself but I'm also looping in Stephan and Stefan how have
>> worked on this the most in the past. Maybe they have some valuable input.
>>
>> Best,
>> Aljoscha
>>
>>
>>> On 4. Oct 2018, at 01:18, Yan Yan <yanyan300...@g.mail.com
>>> <mailto:yanyan300...@g.mail.com>> wrote:
>>>
>>> Hi,
>>>
>>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS
>>> configuration.
>>>
>>> We are using FlinkYarnSessionCli to start the cluster and submit job.
>>>
>>> In 1.3.2, we set below Flink properties when using checkpoints:
>>> state.backend.fs.checkpointdir = hdfs://nameservice0/ <>.../..
>>> state.checkpoints.dir = hdfs://nameservice0/ <>../..
>>>
>>> The mapping between logical nameservice (nameservice0) and actual namenodes
>>> hostports are passed to Flink via yarnship/core-site.xml (by providing the
>>> -yt option), and set fs.hdfs.hadoopconf=yarnship/
>>>
>>> However, we encountered below error after bumping to 1.4, which caused the
>>> checkpointing to fail.
>>>
>>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN
>>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState - Error while
>>> discarding operator states.
>>> java.io.IOException: Cannot instantiate file system for URI:
>>> hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>>> <>
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>>> at
>>> org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>>> at
>>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>> at
>>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>> at
>>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>>> at
>>> org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>>> at
>>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>> at
>>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>> at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalArgumentException:
>>> java.net.UnknownHostException: nameservice0
>>> at
>>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>> ... 15 common frames omitted
>>> Caused by: java.net.UnknownHostException: nameservice0
>>> ... 22 common frames omitted
>>>
>>> It does not recognize nameservice0 because the core-site.xml on the actual
>>> machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0
>>> but something else for the fs.defaultFs
>>>
>>> Digging a little bit, I found that the hadoopConfig (code
>>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>)
>>> does not have the properties we set via yarnship/core-site.xml.
>>> Especially, I suspect it is due to the cached HadoopFsFactory is
>>> initialized with an dummy Configuration (code
>>> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
>>> which prevents future flinkConfig getting passed in (code
>>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>).
>>>
>>> I am not sure if this is intentional, or has been solved in later releases.
>>> Has anyone encountered the same problem? And I would appreciate any
>>> suggestions.
>>>
>>> Thanks,
>>> Yan
>>
>
> --
> Best,
> Yan
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."