Repository: spark
Updated Branches:
  refs/heads/master 47c1d5629 -> 0818fdec3


[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental 
overwriting

This PR fixes a Parquet output file name collision bug which may cause data 
loss.  Changes made:

1.  Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID

    All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC 
for now) must use this UUID to generate task output file path to avoid name 
collision.

2.  Make `TestHive` use a local mode `SparkContext` with 32 threads to increase 
parallelism

    The major reason for this is that, the original parallelism of 2 is too low 
to reproduce the data loss issue.  Also, higher concurrency may potentially 
caught more concurrency bugs during testing phase. (It did help us spotted 
SPARK-8501.)

3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected 
along the way.

NOTE: This PR is made a little bit more complicated than expected because we 
hit two other bugs on the way and have to work them around. See [SPARK-8501] 
[1] and [SPARK-8513] [2].

[1]: https://github.com/liancheng/spark/tree/spark-8501
[2]: https://github.com/liancheng/spark/tree/spark-8513

----

Some background and a summary of offline discussion with yhuai about this issue 
for better understanding:

In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data 
sources that are based on Hadoop `FileSystem` interface.  Specifically, this 
makes partition discovery, partition pruning, and writing dynamic partitions 
for data sources much easier.

To support appending, the Parquet data source tries to find out the max part 
number of part-files in the destination directory (i.e., `<id>` in output file 
name `part-r-<id>.gz.parquet`) at the beginning of the write job.  In 1.3.0, 
this step happens on driver side before any files are written.  However, in 
1.4.0, this is moved to task side.  Unfortunately, for tasks scheduled later, 
they may see wrong max part number generated of files newly written by other 
finished tasks within the same job.  This actually causes a race condition.  In 
most cases, this only causes nonconsecutive part numbers in output file names.  
But when the DataFrame contains thousands of RDD partitions, it's likely that 
two tasks may choose the same part number, then one of them gets overwritten by 
the other.

Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive 
tables.  From a user's perspective, these two look similar.  However, they 
differ a lot internally.  When data are inserted into Hive tables via Spark 
SQL, `InsertIntoHiveTable` simulates Hive's behaviors:

1.  Write data to a temporary location

2.  Move data in the temporary location to the final destination location using

    -   `Hive.loadTable()` for non-partitioned table
    -   `Hive.loadPartition()` for static partitions
    -   `Hive.loadDynamicPartitions()` for dynamic partitions

The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the 
data to the destination directory (I found the name is kinda confusing since no 
"copying" occurs here, we are just moving and renaming stuff).  If a file in 
the source directory and another file in the destination directory happen to 
have the same name, say `part-r-00001.parquet`, the former is moved to the 
destination directory and renamed with a `_copy_N` postfix 
(`part-r-00001_copy_1.parquet`).  That's how Hive handles appending and avoids 
name collision between different write jobs.

Some alternatives fixes considered for this issue:

1.  Use a similar approach as Hive

    This approach is not preferred in Spark 1.4.0 mainly because file metadata 
operations in S3 tend to be slow, especially for tables with lots of file 
and/or partitions.  That's why `InsertIntoHadoopFsRelation` just inserts to 
destination directory directly, and is often used together with 
`DirectParquetOutputCommitter` to reduce latency when working with S3.  This 
means, we don't have the chance to do renaming, and must avoid name collision 
from the very beginning.

2.  Same as 1.3, just move max part number detection back to driver side

    This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning 
into account.  When inserting into dynamic partitions, we don't know which 
partition directories will be touched on driver side before issuing the write 
job.  Checking all partition directories is simply too expensive for tables 
with thousands of partitions.

3.  Add extra component to output file names to avoid name collision

    This seems to be the only reasonable solution for now.  To be more 
specific, we need a JOB level unique identifier to identify all write jobs 
issued by `InsertIntoHadoopFile`.  Notice that TASK level unique identifiers 
can NOT be used.  Because in this way a speculative task will write to a 
different output file from the original task.  If both tasks succeed, duplicate 
output will be left behind.  Currently, the ORC data source adds 
`System.currentTimeMillis` to the output file name for uniqueness.  This 
doesn't work because of exactly the same reason.

    That's why this PR adds a job level random UUID in `BaseWriterContainer` 
(which is used by `InsertIntoHadoopFsRelation` to issue write jobs).  The 
drawback is that record order is not preserved any more (output files of a 
later job may be listed before those of a earlier job).  However, we never 
promise to preserve record order when writing data, and Hive doesn't promise 
this either because the `_copy_N` trick breaks the order.

Author: Cheng Lian <l...@databricks.com>

Closes #6864 from liancheng/spark-8406 and squashes the following commits:

db7a46a [Cheng Lian] More comments
f5c1133 [Cheng Lian] Addresses comments
85c478e [Cheng Lian] Workarounds SPARK-8513
088c76c [Cheng Lian] Adds comment about SPARK-8501
99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids 
double task abortion
4088226 [Cheng Lian] Works around SPARK-8501
1d7d206 [Cheng Lian] Adds more logs
8966bbb [Cheng Lian] Fixes Scala style issue
18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account
3806190 [Cheng Lian] Lets TestHive use all cores by default
748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental 
overwriting


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0818fdec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0818fdec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0818fdec

Branch: refs/heads/master
Commit: 0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13
Parents: 47c1d56
Author: Cheng Lian <l...@databricks.com>
Authored: Mon Jun 22 10:03:57 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jun 22 10:03:57 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 43 +++----------
 .../org/apache/spark/sql/sources/commands.scala | 64 ++++++++++++++++----
 .../spark/sql/hive/orc/OrcFileOperator.scala    |  9 +--
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  5 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 28 +++++----
 .../spark/sql/sources/SimpleTextRelation.scala  |  4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 37 +++++++++--
 8 files changed, 120 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c9de45e..e049d54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
   override def createRelation(
@@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, 
context: TaskAttemptContext
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val conf = context.getConfiguration
     val outputFormat = {
-      // When appending new Parquet files to an existing Parquet file 
directory, to avoid
-      // overwriting existing data files, we need to find out the max task ID 
encoded in these data
-      // file names.
-      // TODO Make this snippet a utility function for other data source 
developers
-      val maxExistingTaskId = {
-        // Note that `path` may point to a temporary location.  Here we 
retrieve the real
-        // destination path from the configuration
-        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
-        val fs = outputPath.getFileSystem(conf)
-
-        if (fs.exists(outputPath)) {
-          // Pattern used to match task ID in part file names, e.g.:
-          //
-          //   part-r-00001.gz.parquet
-          //          ^~~~~
-          val partFilePattern = """part-.-(\d{1,}).*""".r
-
-          fs.listStatus(outputPath).map(_.getPath.getName).map {
-            case partFilePattern(id) => id.toInt
-            case name if name.startsWith("_") => 0
-            case name if name.startsWith(".") => 0
-            case name => throw new AnalysisException(
-              s"Trying to write Parquet files to directory $outputPath, " +
-                s"but found items with illegal name '$name'.")
-          }.reduceOption(_ max _).getOrElse(0)
-        } else {
-          0
-        }
-      }
-
       new ParquetOutputFormat[InternalRow]() {
         // Here we override `getDefaultWorkFile` for two reasons:
         //
-        //  1. To allow appending.  We need to generate output file name based 
on the max available
-        //     task ID computed above.
+        //  1. To allow appending.  We need to generate unique output file 
names to avoid
+        //     overwriting existing files (either exist before the write job, 
or are just written
+        //     by other tasks within the same write job).
         //
         //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
         //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-          val split = context.getTaskAttemptID.getTaskID.getId + 
maxExistingTaskId + 1
-          new Path(path, f"part-r-$split%05d$extension")
+          val uniqueWriteJobId = 
context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+          val split = context.getTaskAttemptID.getTaskID.getId
+          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c16bd9a..215e53c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.sources
 
-import java.util.Date
+import java.util.{Date, UUID}
 
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter, FileOutputFormat}
 
 import org.apache.spark._
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
   }
 }
 
+/**
+ * A command for writing data to a [[HadoopFsRelation]].  Supports both 
overwriting and appending.
+ * Writing to dynamic partitions is also supported.  Each 
[[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job.  Each concrete 
implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate 
unique file path for
+ * each task output file.  This UUID is passed to executor side via a property 
named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and 
[[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ *   1. Driver side setup, including output committer initialization and data 
source specific
+ *      preparation work for the write job to be issued.
+ *   2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
+ *      rows within an RDD partition.
+ *   3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
+ *      exception is thrown during task commitment, also aborts that task.
+ *   4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
+ *      thrown during job commitment, also aborts the job.
+ */
 private[sql] case class InsertIntoHadoopFsRelation(
     @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
@@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
   with Logging
   with Serializable {
 
-  protected val serializableConf = new 
SerializableConfiguration(ContextUtil.getConfiguration(job))
+  protected val serializableConf = new 
SerializableConfiguration(job.getConfiguration)
+
+  // This UUID is used to avoid output file name collision between different 
appending write jobs.
+  // These jobs may belong to different SparkContext instances. Concrete data 
source implementations
+  // may use this UUID to generate unique file names (e.g., 
`part-r-<task-id>-<job-uuid>.parquet`).
+  //  The reason why this ID is used to identify a job rather than a single 
task output file is
+  // that, speculative tasks must generate the same output file name as the 
original task.
+  private val uniqueWriteJobId = UUID.randomUUID()
 
   // This is only used on driver side.
   @transient private val jobContext: JobContext = job
@@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
     setupIDs(0, 0, 0)
     setupConf()
 
+    // This UUID is sent to executor side together with the serialized 
`Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
+    // unique task output files.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
uniqueWriteJobId.toString)
+
     // Order of the following two lines is important.  For Hadoop 1, 
TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the 
TaskAttemptContext first,
     // configurations made in prepareJobForWrite(job) are not populated into 
the TaskAttemptContext.
@@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
       assert(writer != null, "OutputWriter instance should have been 
initialized")
       writer.close()
       super.commitTask()
-    } catch {
-      case cause: Throwable =>
-        super.abortTask()
-        throw new RuntimeException("Failed to commit task", cause)
+    } catch { case cause: Throwable =>
+      // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and will
+      // cause `abortTask()` to be invoked.
+      throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
+      // It's possible that the task fails before `writer` gets initialized
       if (writer != null) {
         writer.close()
       }
@@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
     })
   }
 
-  override def commitTask(): Unit = {
-    try {
+  private def clearOutputWriters(): Unit = {
+    if (outputWriters.nonEmpty) {
       outputWriters.values.foreach(_.close())
       outputWriters.clear()
+    }
+  }
+
+  override def commitTask(): Unit = {
+    try {
+      clearOutputWriters()
       super.commitTask()
     } catch { case cause: Throwable =>
-      super.abortTask()
       throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
-      outputWriters.values.foreach(_.close())
-      outputWriters.clear()
+      clearOutputWriters()
     } finally {
       super.abortTask()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1e51173..e3ab944 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.hive.HiveMetastoreTypes
 import org.apache.spark.sql.types.StructType
 
-private[orc] object OrcFileOperator extends Logging{
+private[orc] object OrcFileOperator extends Logging {
   def getFileReader(pathStr: String, config: Option[Configuration] = None ): 
Reader = {
     val conf = config.getOrElse(new Configuration)
     val fspath = new Path(pathStr)
     val fs = fspath.getFileSystem(conf)
     val orcFiles = listOrcFiles(pathStr, conf)
-
+    logDebug(s"Creating ORC Reader from ${orcFiles.head}")
     // TODO Need to consider all files when schema evolution is taken into 
account.
     OrcFile.createReader(fs, orcFiles.head)
   }
@@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
     val reader = getFileReader(path, conf)
     val readerInspector = 
reader.getObjectInspector.asInstanceOf[StructObjectInspector]
     val schema = readerInspector.getTypeName
+    logDebug(s"Reading schema from file $path, got Hive schema string: 
$schema")
     HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
   }
 
@@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
   def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)
     val fs = origPath.getFileSystem(conf)
-    val path = origPath.makeQualified(fs)
+    val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
     val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
       .filterNot(_.isDir)
       .map(_.getPath)
       .filterNot(_.getName.startsWith("_"))
       .filterNot(_.getName.startsWith("."))
 
-    if (paths == null || paths.size == 0) {
+    if (paths == null || paths.isEmpty) {
       throw new IllegalArgumentException(
         s"orcFileOperator: path $path does not have valid orc files matching 
the pattern")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index dbce39f..705f48f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => 
MapRedInputFormat, JobConf, Reco
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
+import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, 
HiveInspectors, HiveMetastoreType
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging}
 import org.apache.spark.util.SerializableConfiguration
 
 /* Implicit conversions */
@@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
     recordWriterInstantiated = true
 
     val conf = context.getConfiguration
+    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val partition = context.getTaskAttemptID.getTaskID.getId
-    val filename = 
f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
+    val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
 
     new OrcOutputFormat().getRecordWriter(
       new Path(path, filename).getFileSystem(conf),

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f901bd8..ea325cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
 object TestHive
   extends TestHiveContext(
     new SparkContext(
-      System.getProperty("spark.sql.test.master", "local[2]"),
+      System.getProperty("spark.sql.test.master", "local[32]"),
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 82e08ca..a0cdd0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with 
BeforeAndAfterAll {
     orcTableDir.mkdir()
     import org.apache.spark.sql.hive.test.TestHive.implicits._
 
+    // Originally we were using a 10-row RDD for testing.  However, when 
default parallelism is
+    // greater than 10 (e.g., running on a node with 32 cores), this RDD 
contains empty partitions,
+    // which result in empty ORC files.  Unfortunately, ORC doesn't handle 
empty files properly and
+    // causes build failure on Jenkins, which happens to have 32 cores. Please 
refer to SPARK-8501
+    // for more details.  To workaround this issue before fixing SPARK-8501, 
we simply increase row
+    // number in this RDD to avoid empty partitions.
     sparkContext
-      .makeRDD(1 to 10)
+      .makeRDD(1 to 100)
       .map(i => OrcData(i, s"part-$i"))
       .toDF()
       .registerTempTable(s"orc_temp_table")
@@ -70,35 +76,35 @@ abstract class OrcSuite extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("create temporary orc table") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source where intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY 
stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("create temporary orc table as") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY 
stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("appending insert") {
@@ -106,7 +112,7 @@ abstract class OrcSuite extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
         Seq.fill(2)(Row(i, s"part-$i"))
       })
   }
@@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_as_source"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0f959b3..5d7cd16 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends 
TextOutputFormat[NullW
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
+    val uniqueWriteJobId = 
context.getConfiguration.get("spark.sql.sources.writeJobUUID")
     val split = context.getTaskAttemptID.getTaskID.getId
     val name = FileOutputFormat.getOutputName(context)
-    new Path(outputFile, 
s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+    new Path(outputFile, 
s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
   }
 }
 
@@ -156,6 +157,7 @@ class CommitFailureTestRelation(
         context: TaskAttemptContext): OutputWriter = {
       new SimpleTextOutputWriter(path, context) {
         override def close(): Unit = {
+          super.close()
           sys.error("Intentional task commitment failure for testing purpose.")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 76469d7..e0d8277 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
   import sqlContext.sql
   import sqlContext.implicits._
 
-  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+  val dataSourceName: String
 
   val dataSchema =
     StructType(
@@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
       checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  // NOTE: This test suite is not super deterministic.  On nodes with only 
relatively few cores
+  // (4 or even 1), it's hard to reproduce the data loss issue.  But on nodes 
with for example 8 or
+  // more cores, the issue can be reproduced steadily.  Fortunately our 
Jenkins builder meets this
+  // requirement.  We probably want to move this test case to 
spark-integration-tests or spark-perf
+  // later.
+  test("SPARK-8406: Avoids name collision while writing Parquet files") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext
+        .range(10000)
+        .repartition(250)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(dataSourceName)
+        .save(path)
+
+      assertResult(10000) {
+        sqlContext
+          .read
+          .format(dataSourceName)
+          .option("dataSchema", StructType(StructField("id", LongType) :: 
Nil).json)
+          .load(path)
+          .count()
+      }
+    }
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest {
 }
 
 class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
-  import TestHive.implicits._
-
   override val sqlContext = TestHive
 
+  // When committing a task, `CommitFailureTestSource` throws an exception for 
testing purpose.
   val dataSourceName: String = 
classOf[CommitFailureTestSource].getCanonicalName
 
   test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
     withTempPath { file =>
-      val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+      // Here we coalesce partition number to 1 to ensure that only a single 
task is issued.  This
+      // prevents race condition happened when FileOutputCommitter tries to 
remove the `_temporary`
+      // directory while committing/aborting the job.  See SPARK-8513 for more 
details.
+      val df = sqlContext.range(0, 10).coalesce(1)
       intercept[SparkException] {
         df.write.format(dataSourceName).save(file.getCanonicalPath)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to