Hi Aljoscha,

Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2
and it works. So seems truly a feature disparity between 1.3.2 and 1.4.

Best,
Yan


On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> Another thing: when you retry this again with Flink 1.3.2 it works? I'm
> trying to rule out another problem in the setup.
>
>
> On 4. Oct 2018, at 15:17, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi Yan,
>
> This seems to be a bug in the FileSystems and how they're initialized. I'm
> looking into this myself but I'm also looping in Stephan and Stefan how
> have worked on this the most in the past. Maybe they have some valuable
> input.
>
> Best,
> Aljoscha
>
>
> On 4. Oct 2018, at 01:18, Yan Yan <yanyan300...@g.mail.com> wrote:
>
> Hi,
>
> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS
> configuration.
>
> We are using *FlinkYarnSessionCli* to start the cluster and submit job.
>
> In 1.3.2, we set below Flink properties when using checkpoints:
> state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
> state.checkpoints.dir = hdfs://nameservice0/../..
>
> The mapping between logical nameservice (nameservice0) and actual
> namenodes hostports are passed to Flink via *yarnship/core-site.xml *(by
> providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>
> However, we encountered below error after bumping to 1.4, which caused the
> checkpointing to fail.
>
> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  
> org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while 
> discarding operator states.
> java.io.IOException: Cannot instantiate file system for URI: 
> hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>     at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>     at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>     at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>     at 
> org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>     at 
> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at 
> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at 
> org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>     at 
> org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>     at 
> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at 
> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> nameservice0
>     at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>     at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>     at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>     ... 15 common frames omitted
> Caused by: java.net.UnknownHostException: nameservice0
>     ... 22 common frames omitted
>
>
> It does not recognize nameservice0 because the *core-site.xml *on the
> actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use
> nameservice0 but something else for the *fs.defaultFs*
>
> Digging a little bit, I found that the *hadoopConfig* (code
> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>)
>  does
> not have the properties we set via *yarnship/core-site.xml*. Especially,
> I suspect it is due to the cached *HadoopFsFactory* is initialized with
> an dummy Configuration (code
> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
> which prevents future *flinkConfig* getting passed in (code
> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>
> ).
>
> I am not sure if this is intentional, or has been solved in later
> releases. Has anyone encountered the same problem? And I would appreciate
> any suggestions.
>
> Thanks,
> Yan
>
>
>
> --
Best,
Yan

Reply via email to