Github user merlintang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15819#discussion_r88778830
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
    @@ -54,6 +61,61 @@ case class InsertIntoHiveTable(
       @transient private lazy val hiveContext = new Context(sc.hiveconf)
       @transient private lazy val catalog = sc.catalog
     
    +  val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
    +
    +  private def executionId: String = {
    +    val rand: Random = new Random
    +    val format: SimpleDateFormat = new 
SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
    +    val executionId: String = "hive_" + format.format(new Date) + "_" + 
Math.abs(rand.nextLong)
    +    return executionId
    +  }
    +
    +  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): 
Path = {
    +    val inputPathUri: URI = inputPath.toUri
    +    val inputPathName: String = inputPathUri.getPath
    +    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
    +    val stagingPathName: String =
    +      if (inputPathName.indexOf(stagingDir) == -1) {
    +        new Path(inputPathName, stagingDir).toString
    +      } else {
    +        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
    +      }
    +    val dir: Path =
    +      fs.makeQualified(
    +        new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
    +    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
    +    try {
    +      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
    +        throw new IllegalStateException("Cannot create staging directory  
'" + dir.toString + "'")
    +      }
    +      fs.deleteOnExit(dir)
    +    }
    +    catch {
    +      case e: IOException =>
    +        throw new RuntimeException(
    --- End diff --
    
    You can find the reason that we use this code is because (1) the old 
version need to use the hive package to create the staging directory, in the 
hive code, this staging directory is storied in a hash map, and then these 
staging directories would be removed when the session is closed. however, our 
spark code do not trigger the hive session close, then, these directories will 
not be removed. (2) you can find the pushed code just simulate the hive way to 
create the staging directory inside the spark rather than based on the hive. 
Then, the staging directory will be removed. (3) I will fix the return type 
issue, thanks for your comments @srowen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to