Andre Araujo created SPARK-26825:
------------------------------------

             Summary: Spark Structure Streaming job failing when submitted in 
cluster mode
                 Key: SPARK-26825
                 URL: https://issues.apache.org/jira/browse/SPARK-26825
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.0
            Reporter: Andre Araujo


I have a structured streaming job that runs successfully when launched in 
"client" mode. However, when launched in "cluster" mode it fails with the 
following weird messages on the error log. Note that the path in the error 
message is actually a local filesystem path that has been mistakenly prefixed 
with a {{hdfs://}} scheme.

{code}
19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream metadata 
StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_000001/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
 Permission denied: user=root, access=WRITE, 
inode="/":hdfs:supergroup:drwxr-xr-x
        at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
        at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
        at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
{code}

I dug a little bit into this and here's what I think it's going on:

# When a new streaming query is created, the {{StreamingQueryManager}} 
determines the checkpoint location 
[here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
 If neither the user nor the Spark conf specify a checkpoint location, the 
location is returned by a call to {{Utils.createTempDir(namePrefix = 
s"temporary").getCanonicalPath}}. 
   Here, I see two issues:
#* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of file 
system the path belongs to.
#* Also note that the path returned by the {{Utils.createTempDir}} call is a 
local path, not a HDFS path, as the paths returned by the other two conditions. 
I executed {{Utils.createTempDir}} in a test job, both in cluster and client 
modes, and the results are these:
{code}
*Client mode:*
java.io.tmpdir=/tmp
createTempDir(namePrefix = s"temporary") => 
/tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25

*Cluster mode:*
java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/
createTempDir(namePrefix = s"temporary") => 
/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
{code}
# This temporary checkpoint location is then [passed to the 
constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
 of the {{MicroBatchExecution}} instance
# This is the point where [{{resolvedCheckpointRoot}} is 
calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
 Here, it's where things start to break: since the path returned by 
{{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
filesystem, the code resolves the path as being a HDFS path, rather than a 
local one, as shown below:
{code}
scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> // value returned by the Utils.createTempDir method
scala> val checkpointRoot = 
"/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
checkpointRoot: String = 
/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e

scala> val checkpointPath = new Path(checkpointRoot)
checkpointPath: org.apache.hadoop.fs.Path = 
/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e

scala> // The resolved FS is a DFS, rather than a local filesystem, due to the 
lack of a scheme in the path
scala> val fs = checkpointPath.getFileSystem(spark.sessionState.newHadoopConf())
fs: org.apache.hadoop.fs.FileSystem = 
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_632752661_1, ugi=root 
(auth:SIMPLE)]]

scala> // The generated path is invalid: it's a local path prefixed with a 
hdfs: scheme
scala> checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
res1: String = 
hdfs://ns1/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
{code}
# Then, when the job tries to save the metadata in the path above it fails 
because the path doesn't exist (the actual message is a "permission denied" 
message because the user doesn't have permission to create the "/data" 
directory in the HDFS root)

I believe this could be fixed by simply:
* Replacing the call to {{Utils.createTempDir}} with something that creates a 
temp dir on HDFS, rather than local filesystem
* Ensuring this method returns a path qualified with a scheme (hdfs:), to avoid 
later fs resolution mistakes.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to