Repository: spark
Updated Branches:
  refs/heads/branch-1.4 fe59a4a5f -> f0e404020


[SPARK-8379] [SQL] avoid speculative tasks write to the same file

The issue link [SPARK-8379](https://issues.apache.org/jira/browse/SPARK-8379)
Currently,when we insert data to the dynamic partition with speculative tasks 
we will get the Exception
```
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
Lease mismatch on 
/tmp/hive-jeanlyn/hive_2015-06-15_15-20-44_734_8801220787219172413-1/-ext-10000/ds=2015-06-15/type=2/part-00301.lzo
owned by DFSClient_attempt_201506031520_0011_m_000189_0_-1513487243_53
but is accessed by DFSClient_attempt_201506031520_0011_m_000042_0_-1275047721_57
```
This pr try to write the data to temporary dir when using dynamic parition  
avoid the speculative tasks writing the same file

Author: jeanlyn <jeanly...@gmail.com>

Closes #6833 from jeanlyn/speculation and squashes the following commits:

64bbfab [jeanlyn] use FileOutputFormat.getTaskOutputPath to get the path
8860af0 [jeanlyn] remove the never using code
e19a3bd [jeanlyn] avoid speculative tasks write same file

(cherry picked from commit a1e3649c8775d71ca78796b6544284e942ac1331)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e40402
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e40402
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e40402

Branch: refs/heads/branch-1.4
Commit: f0e4040202f77d16f04468b639abc2bb0d7257ec
Parents: fe59a4a
Author: jeanlyn <jeanly...@gmail.com>
Authored: Sun Jun 21 00:13:40 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Sun Jun 21 00:13:55 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/execution/InsertIntoHiveTable.scala   |  1 -
 .../org/apache/spark/sql/hive/hiveWriterContainers.scala | 11 +++++------
 2 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0e40402/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8613332..72c4448 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
       table.hiveQlTable.getPartCols().foreach { entry =>
         orderedPartitionSpec.put(entry.getName, 
partitionSpec.get(entry.getName).getOrElse(""))
       }
-      val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, 
partitionSpec)
 
       // inheritTableSpecs is set to true. It should be set to false for a 
IMPORT query
       // which is currently considered as a Hive native command.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0e40402/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 2bb526b..e1e8e47 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -228,12 +228,11 @@ private[spark] class 
SparkHiveDynamicPartitionWriterContainer(
       newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
       newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
 
-      val path = {
-        val outputPath = FileOutputFormat.getOutputPath(conf.value)
-        assert(outputPath != null, "Undefined job output-path")
-        val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
-        new Path(workPath, getOutputName)
-      }
+      // use the path like ${hive_tmp}/_temporary/${attemptId}/
+      // to avoid write to the same file when `spark.speculation=true`
+      val path = FileOutputFormat.getTaskOutputPath(
+        conf.value,
+        dynamicPartPath.stripPrefix("/") + "/" + getOutputName)
 
       HiveFileFormatUtils.getHiveRecordWriter(
         conf.value,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to