[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760901#comment-16760901 ]
Andre Araujo commented on SPARK-26825: -------------------------------------- Thanks a lot, [~gsomogyi] > Spark Structure Streaming job failing when submitted in cluster mode > -------------------------------------------------------------------- > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Andre Araujo > Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_000001/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. > Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = > /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> val checkpointPath = new Path(checkpointRoot) > checkpointPath: org.apache.hadoop.fs.Path = > /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> // The resolved FS is a DFS, rather than a local filesystem, due to > the lack of a scheme in the path > scala> val fs = > checkpointPath.getFileSystem(spark.sessionState.newHadoopConf()) > fs: org.apache.hadoop.fs.FileSystem = > DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_632752661_1, ugi=root > (auth:SIMPLE)]] > scala> // The generated path is invalid: it's a local path prefixed with a > hdfs: scheme > scala> checkpointPath.makeQualified(fs.getUri, > fs.getWorkingDirectory).toUri.toString > res1: String = > hdfs://ns1/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # Then, when the job tries to save the metadata in the path above it fails > because the path doesn't exist (the actual message is a "permission denied" > message because the user doesn't have permission to create the "/data" > directory in the HDFS root) > I believe this could be fixed by simply: > * Replacing the call to {{Utils.createTempDir}} with something that creates a > temp dir on HDFS, rather than local filesystem > * Ensuring this method returns a path qualified with a scheme (hdfs:), to > avoid later fs resolution mistakes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org