Repository: spark
Updated Branches:
  refs/heads/master 21c4450fb -> 8c7e19a37


[SPARK-4131] Merge HiveTmpFile.scala to SaveAsHiveFile.scala

## What changes were proposed in this pull request?

The code is already merged to master:
https://github.com/apache/spark/pull/18975

This is a following up PR to merge HiveTmpFile.scala to SaveAsHiveFile.

## How was this patch tested?

Build successfully

Author: Jane Wang <janew...@fb.com>

Closes #19221 from janewangfb/merge_savehivefile_hivetmpfile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c7e19a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c7e19a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c7e19a3

Branch: refs/heads/master
Commit: 8c7e19a37dc5af924be8b7af0c3607d5c7a4e96c
Parents: 21c4450
Author: Jane Wang <janew...@fb.com>
Authored: Wed Sep 13 15:12:36 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Sep 13 15:12:36 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/execution/HiveTmpPath.scala  | 203 -------------------
 .../execution/InsertIntoHiveDirCommand.scala    |   2 +-
 .../hive/execution/InsertIntoHiveTable.scala    |   2 +-
 .../sql/hive/execution/SaveAsHiveFile.scala     | 175 ++++++++++++++++
 4 files changed, 177 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8c7e19a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
deleted file mode 100644
index 15ca1df..0000000
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import java.io.{File, IOException}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Random}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.ql.exec.TaskRunner
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.client.HiveVersion
-
-// Base trait for getting a temporary location for writing data
-private[hive] trait HiveTmpPath extends Logging {
-
-  var createdTempDir: Option[Path] = None
-
-  def getExternalTmpPath(
-      sparkSession: SparkSession,
-      hadoopConf: Configuration,
-      path: Path): Path = {
-    import org.apache.spark.sql.hive.client.hive._
-
-    // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
-    // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
-    // directory and move the staging directory to it.
-    // After Hive 1.1, Hive will create the staging directory under the table 
directory, and when
-    // moving staging directory to table directory, Hive will still empty the 
table directory, but
-    // will exclude the staging directory there.
-    // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
-    // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
-    // be removed by Hive when Hive is trying to empty the table directory.
-    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, 
v1_2, v2_0, v2_1)
-
-    // Ensure all the supported versions are considered here.
-    assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
-      allSupportedHiveVersions)
-
-    val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hiveVersion = 
externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
-    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
-    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
-
-    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
-      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
-    } else {
-      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
-    }
-  }
-
-  def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
-    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
-    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
-    try {
-      createdTempDir.foreach { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        if (fs.delete(path, true)) {
-          // If we successfully delete the staging directory, remove it from 
FileSystem's cache.
-          fs.cancelDeleteOnExit(path)
-        }
-      }
-    } catch {
-      case NonFatal(e) =>
-        val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
-        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
-    }
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
-    val extURI: URI = path.toUri
-    val scratchPath = new Path(scratchDir, executionId)
-    var dirPath = new Path(
-      extURI.getScheme,
-      extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw new RuntimeException("Cannot create staging directory: " + 
dirPath.toString, e)
-    }
-    dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val extURI: URI = path.toUri
-    if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
-    } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
-    }
-  }
-
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    getStagingDir(
-      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
-      stagingDir)
-  }
-
-  private def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val inputPathUri: URI = inputPath.toUri
-    val inputPathName: String = inputPathUri.getPath
-    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-    var stagingPathName: String =
-      if (inputPathName.indexOf(stagingDir) == -1) {
-        new Path(inputPathName, stagingDir).toString
-      } else {
-        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
-      }
-
-    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
-    // staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
-    // under the table directory.
-    if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
-      
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith("."))
 {
-      logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
-        "with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
-        "directory.")
-      stagingPathName = new Path(inputPathName, ".hive-staging").toString
-    }
-
-    val dir: Path =
-      fs.makeQualified(
-        new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
-    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
-    try {
-      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
-      }
-      createdTempDir = Some(dir)
-      fs.deleteOnExit(dir)
-    } catch {
-      case e: IOException =>
-        throw new RuntimeException(
-          "Cannot create staging directory '" + dir.toString + "': " + 
e.getMessage, e)
-    }
-    dir
-  }
-
-  private def executionId: String = {
-    val rand: Random = new Random
-    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
-    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/8c7e19a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 2110038..918c8be 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand(
     isLocal: Boolean,
     storage: CatalogStorageFormat,
     query: LogicalPlan,
-    overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath {
+    overwrite: Boolean) extends SaveAsHiveFile {
 
   override def children: Seq[LogicalPlan] = query :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8c7e19a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 5bdc97a..e5b59ed 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -70,7 +70,7 @@ case class InsertIntoHiveTable(
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath {
+    ifPartitionNotExists: Boolean) extends SaveAsHiveFile {
 
   override def children: Seq[LogicalPlan] = query :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8c7e19a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 7de9b42..ad86994 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -17,7 +17,17 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.io.{File, IOException}
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
+
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
@@ -25,11 +35,15 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.hive.client.HiveVersion
 
 // Base trait from which all hive insert statement physical execution extends.
 private[hive] trait SaveAsHiveFile extends DataWritingCommand {
 
+  var createdTempDir: Option[Path] = None
+
   protected def saveAsHiveFile(
       sparkSession: SparkSession,
       plan: SparkPlan,
@@ -69,5 +83,166 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
       statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
       options = Map.empty)
   }
+
+  protected def getExternalTmpPath(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      path: Path): Path = {
+    import org.apache.spark.sql.hive.client.hive._
+
+    // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
+    // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
+    // directory and move the staging directory to it.
+    // After Hive 1.1, Hive will create the staging directory under the table 
directory, and when
+    // moving staging directory to table directory, Hive will still empty the 
table directory, but
+    // will exclude the staging directory there.
+    // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
+    // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
+    // be removed by Hive when Hive is trying to empty the table directory.
+    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
+    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, 
v1_2, v2_0, v2_1)
+
+    // Ensure all the supported versions are considered here.
+    assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
+      allSupportedHiveVersions)
+
+    val externalCatalog = sparkSession.sharedState.externalCatalog
+    val hiveVersion = 
externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
+    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
+    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
+      newVersionExternalTempPath(path, hadoopConf, stagingDir)
+    } else {
+      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+    }
+  }
+
+  protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
+    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
+    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from 
FileSystem's cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
+    } catch {
+      case NonFatal(e) =>
+        val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  private def oldVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      scratchDir: String): Path = {
+    val extURI: URI = path.toUri
+    val scratchPath = new Path(scratchDir, executionId)
+    var dirPath = new Path(
+      extURI.getScheme,
+      extURI.getAuthority,
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+    try {
+      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+      dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
+      }
+      createdTempDir = Some(dirPath)
+      fs.deleteOnExit(dirPath)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException("Cannot create staging directory: " + 
dirPath.toString, e)
+    }
+    dirPath
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  private def newVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    val extURI: URI = path.toUri
+    if (extURI.getScheme == "viewfs") {
+      getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
+    } else {
+      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
+    }
+  }
+
+  private def getExtTmpPathRelTo(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
+  }
+
+  private def getExternalScratchDir(
+      extURI: URI,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    getStagingDir(
+      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+      hadoopConf,
+      stagingDir)
+  }
+
+  private def getStagingDir(
+      inputPath: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    val inputPathUri: URI = inputPath.toUri
+    val inputPathName: String = inputPathUri.getPath
+    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+    var stagingPathName: String =
+      if (inputPathName.indexOf(stagingDir) == -1) {
+        new Path(inputPathName, stagingDir).toString
+      } else {
+        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
+      }
+
+    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
+    // staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
+    // under the table directory.
+    if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
+      
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith("."))
 {
+      logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
+        "with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
+        "directory.")
+      stagingPathName = new Path(inputPathName, ".hive-staging").toString
+    }
+
+    val dir: Path =
+      fs.makeQualified(
+        new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
+    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+    try {
+      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
+      }
+      createdTempDir = Some(dir)
+      fs.deleteOnExit(dir)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException(
+          "Cannot create staging directory '" + dir.toString + "': " + 
e.getMessage, e)
+    }
+    dir
+  }
+
+  private def executionId: String = {
+    val rand: Random = new Random
+    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
+    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to