[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760702#comment-16760702
 ] 

Gabor Somogyi commented on SPARK-26825:
---------------------------------------

[~asdaraujo] excellent analysis! One minor correction:
 * Replacing the call to Utils.createTempDir with something that creates a temp 
dir on default FS, rather than local filesystem

This is correct:
{quote}Ensuring this method returns a path qualified with a scheme (hdfs://, to 
avoid later fs resolution mistakes.{quote}


> Spark Structure Streaming job failing when submitted in cluster mode
> --------------------------------------------------------------------
>
>                 Key: SPARK-26825
>                 URL: https://issues.apache.org/jira/browse/SPARK-26825
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Andre Araujo
>            Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_000001/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>    Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> checkpointPath: org.apache.hadoop.fs.Path = 
> /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> // The resolved FS is a DFS, rather than a local filesystem, due to 
> the lack of a scheme in the path
> scala> val fs = 
> checkpointPath.getFileSystem(spark.sessionState.newHadoopConf())
> fs: org.apache.hadoop.fs.FileSystem = 
> DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_632752661_1, ugi=root 
> (auth:SIMPLE)]]
> scala> // The generated path is invalid: it's a local path prefixed with a 
> hdfs: scheme
> scala> checkpointPath.makeQualified(fs.getUri, 
> fs.getWorkingDirectory).toUri.toString
> res1: String = 
> hdfs://ns1/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # Then, when the job tries to save the metadata in the path above it fails 
> because the path doesn't exist (the actual message is a "permission denied" 
> message because the user doesn't have permission to create the "/data" 
> directory in the HDFS root)
> I believe this could be fixed by simply:
> * Replacing the call to {{Utils.createTempDir}} with something that creates a 
> temp dir on HDFS, rather than local filesystem
> * Ensuring this method returns a path qualified with a scheme (hdfs:), to 
> avoid later fs resolution mistakes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to