This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 979bb90  [SPARK-26936][SQL] Fix bug of insert overwrite local dir can 
not create temporary path in local staging directory
979bb90 is described below

commit 979bb905b77b95bc26dff33b7884b38d8dd869f5
Author: gengjiaan <gengji...@360.cn>
AuthorDate: Fri Apr 5 14:02:46 2019 -0500

    [SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create 
temporary path in local staging directory
    
    ## What changes were proposed in this pull request?
    Th environment of my cluster as follows:
    ```
    OS:Linux version 2.6.32-220.7.1.el6.x86_64 
(mockbuildc6b18n3.bsys.dev.centos.org) (gcc version 4.4.6 20110731 (Red Hat 
4.4.6-3) (GCC) ) #1 SMP Wed Mar 7 00:52:02 GMT 2012
    Hadoop: 2.7.2
    Spark: 2.3.0 or 3.0.0(master branch)
    Hive: 1.2.1
    ```
    
    My spark run on deploy mode yarn-client.
    
    If I execute the SQL `insert overwrite local directory 
'/home/test/call_center/' select * from call_center`, a HiveException will 
appear as follows:
    `Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: 
java.io.IOException: Mkdirs failed to create 
file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3
 (exists=false, 
cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011)
    at 
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)`
    Current spark sql generate a local temporary path in local staging 
directory.The schema of local temporary path start with `file`, so the 
HiveException appears.
    This PR change the local temporary path to HDFS temporary path, and use 
DistributedFileSystem instance copy the data from HDFS temporary path to local 
directory.
    If Spark run on local deploy mode, 'insert overwrite local directory' works 
fine.
    ## How was this patch tested?
    
    UT cannot support yarn-client mode.The test is in my product environment.
    
    Closes #23841 from beliefer/fix-bug-of-insert-overwrite-local-dir.
    
    Authored-by: gengjiaan <gengji...@360.cn>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../hive/execution/InsertIntoHiveDirCommand.scala  | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 0c694910..1825af6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -86,20 +86,21 @@ case class InsertIntoHiveDirCommand(
     val jobConf = new JobConf(hadoopConf)
 
     val targetPath = new Path(storage.locationUri.get)
-    val writeToPath =
+    val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
+    val (writeToPath: Path, fs: FileSystem) =
       if (isLocal) {
         val localFileSystem = FileSystem.getLocal(jobConf)
-        localFileSystem.makeQualified(targetPath)
+        (localFileSystem.makeQualified(targetPath), localFileSystem)
       } else {
-        val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
-        val dfs = qualifiedPath.getFileSystem(jobConf)
-        if (!dfs.exists(qualifiedPath)) {
-          dfs.mkdirs(qualifiedPath.getParent)
-        }
-        qualifiedPath
+        val dfs = qualifiedPath.getFileSystem(hadoopConf)
+        (qualifiedPath, dfs)
       }
+    if (!fs.exists(writeToPath)) {
+      fs.mkdirs(writeToPath)
+    }
 
-    val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
+    // The temporary path must be a HDFS path, not a local path.
+    val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
     val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
       tmpPath.toString, tableDesc, false)
 
@@ -111,15 +112,20 @@ case class InsertIntoHiveDirCommand(
         fileSinkConf = fileSinkConf,
         outputLocation = tmpPath.toString)
 
-      val fs = writeToPath.getFileSystem(hadoopConf)
       if (overwrite && fs.exists(writeToPath)) {
         fs.listStatus(writeToPath).foreach { existFile =>
           if (Option(existFile.getPath) != createdTempDir) 
fs.delete(existFile.getPath, true)
         }
       }
 
-      fs.listStatus(tmpPath).foreach {
-        tmpFile => fs.rename(tmpFile.getPath, writeToPath)
+      val dfs = tmpPath.getFileSystem(hadoopConf)
+      dfs.listStatus(tmpPath).foreach {
+        tmpFile =>
+          if (isLocal) {
+            dfs.copyToLocalFile(tmpFile.getPath, writeToPath)
+          } else {
+            dfs.rename(tmpFile.getPath, writeToPath)
+          }
       }
     } catch {
       case e: Throwable =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to