Repository: spark
Updated Branches:
  refs/heads/master 3aec9f4e2 -> 431ca39be
diff --git 
new file mode 100644
index 0000000..e976125
--- /dev/null
@@ -0,0 +1,47 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources
+import org.apache.hadoop.fs.Path
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.test.SQLTestUtils
+class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
+  override val sqlContext = TestHive
+  // When committing a task, `CommitFailureTestSource` throws an exception for 
testing purpose.
+  val dataSourceName: String = 
+  test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
+    withTempPath { file =>
+      // Here we coalesce partition number to 1 to ensure that only a single 
task is issued.  This
+      // prevents race condition happened when FileOutputCommitter tries to 
remove the `_temporary`
+      // directory while committing/aborting the job.  See SPARK-8513 for more 
+      val df = sqlContext.range(0, 10).coalesce(1)
+      intercept[SparkException] {
+        df.write.format(dataSourceName).save(file.getCanonicalPath)
+      }
+      val fs = new 
+      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+    }
+  }
diff --git 
new file mode 100644
index 0000000..d280543
--- /dev/null
@@ -0,0 +1,139 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources
+import org.apache.hadoop.fs.Path
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{AnalysisException, SaveMode, parquet}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = 
+  import sqlContext._
+  import sqlContext.implicits._
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+          .toDF("a", "b", "p1")
+          .write.parquet(partitionDir.toString)
+      }
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+      checkQueries(
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchemaWithPartition.json)
+          .load(file.getCanonicalPath))
+    }
+  }
+  test("SPARK-7868: _temporary directories should be ignored") {
+    withTempPath { dir =>
+      val df = Seq("a", "b", "c").zipWithIndex.toDF()
+      df.write
+        .format("parquet")
+        .save(dir.getCanonicalPath)
+      df.write
+        .format("parquet")
+        .save(s"${dir.getCanonicalPath}/_temporary")
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), 
+    }
+  }
+  test("SPARK-8014: Avoid scanning output directory when SaveMode isn't 
SaveMode.Append") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      val df = Seq(1 -> "a").toDF()
+      // Creates an arbitrary file.  If this directory gets scanned, 
ParquetRelation2 will throw
+      // since it's not a valid Parquet file.
+      val emptyFile = new File(path, "empty")
+      Files.createParentDirs(emptyFile)
+      Files.touch(emptyFile)
+      // This shouldn't throw anything.
+      df.write.format("parquet").mode(SaveMode.Ignore).save(path)
+      // This should only complain that the destination directory already 
exists, rather than file
+      // "empty" is not a Parquet file.
+      assert {
+        intercept[AnalysisException] {
+          df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
+        }.getMessage.contains("already exists")
+      }
+      // This shouldn't throw anything.
+      df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
+      checkAnswer(read.format("parquet").load(path), df)
+    }
+  }
+  test("SPARK-8079: Avoid NPE thrown from BaseWriterContainer.abortJob") {
+    withTempPath { dir =>
+      intercept[AnalysisException] {
+        // Parquet doesn't allow field names with spaces.  Here we are 
intentionally making an
+        // exception thrown from the `ParquetRelation2.prepareForWriteJob()` 
method to trigger
+        // the bug.  Please refer to spark-8079 for more details.
+        range(1, 10)
+          .withColumnRenamed("id", "a b")
+          .write
+          .format("parquet")
+          .save(dir.getCanonicalPath)
+      }
+    }
+  }
+  test("SPARK-8604: Parquet data source should write summary file while doing 
appending") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(0, 5)
+      df.write.mode(SaveMode.Overwrite).parquet(path)
+      val summaryPath = new Path(path, "_metadata")
+      val commonSummaryPath = new Path(path, "_common_metadata")
+      val fs = summaryPath.getFileSystem(configuration)
+      fs.delete(summaryPath, true)
+      fs.delete(commonSummaryPath, true)
+      df.write.mode(SaveMode.Append).parquet(path)
+      checkAnswer(, df.unionAll(df))
+      assert(fs.exists(summaryPath))
+      assert(fs.exists(commonSummaryPath))
+    }
+  }
diff --git 
new file mode 100644
index 0000000..d761909
--- /dev/null
@@ -0,0 +1,57 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources
+import org.apache.hadoop.fs.Path
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+This is commented out due a bug in the data source API (SPARK-9291).
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = 
+  import sqlContext._
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+      checkQueries(
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchemaWithPartition.json)
+          .load(file.getCanonicalPath))
+    }
+  }
diff --git 
index 2a8748d..dd27402 100644
@@ -17,18 +17,14 @@
 package org.apache.spark.sql.sources
 import scala.collection.JavaConversions._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 import org.apache.parquet.hadoop.ParquetOutputCommitter
-import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -581,165 +577,3 @@ class AlwaysFailParquetOutputCommitter(
     sys.error("Intentional job commitment failure for testing purpose.")
-class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
-  override val dataSourceName: String = 
-  import sqlContext._
-  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
-          .saveAsTextFile(partitionDir.toString)
-      }
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
-      checkQueries(
-        read.format(dataSourceName)
-          .option("dataSchema", dataSchemaWithPartition.json)
-          .load(file.getCanonicalPath))
-    }
-  }
-class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
-  override val sqlContext = TestHive
-  // When committing a task, `CommitFailureTestSource` throws an exception for 
testing purpose.
-  val dataSourceName: String = 
-  test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
-    withTempPath { file =>
-      // Here we coalesce partition number to 1 to ensure that only a single 
task is issued.  This
-      // prevents race condition happened when FileOutputCommitter tries to 
remove the `_temporary`
-      // directory while committing/aborting the job.  See SPARK-8513 for more 
-      val df = sqlContext.range(0, 10).coalesce(1)
-      intercept[SparkException] {
-        df.write.format(dataSourceName).save(file.getCanonicalPath)
-      }
-      val fs = new 
-      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
-    }
-  }
-class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
-  override val dataSourceName: String = 
-  import sqlContext._
-  import sqlContext.implicits._
-  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
-          .toDF("a", "b", "p1")
-          .write.parquet(partitionDir.toString)
-      }
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
-      checkQueries(
-        read.format(dataSourceName)
-          .option("dataSchema", dataSchemaWithPartition.json)
-          .load(file.getCanonicalPath))
-    }
-  }
-  test("SPARK-7868: _temporary directories should be ignored") {
-    withTempPath { dir =>
-      val df = Seq("a", "b", "c").zipWithIndex.toDF()
-      df.write
-        .format("parquet")
-        .save(dir.getCanonicalPath)
-      df.write
-        .format("parquet")
-        .save(s"${dir.getCanonicalPath}/_temporary")
-      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), 
-    }
-  }
-  test("SPARK-8014: Avoid scanning output directory when SaveMode isn't 
SaveMode.Append") {
-    withTempDir { dir =>
-      val path = dir.getCanonicalPath
-      val df = Seq(1 -> "a").toDF()
-      // Creates an arbitrary file.  If this directory gets scanned, 
ParquetRelation2 will throw
-      // since it's not a valid Parquet file.
-      val emptyFile = new File(path, "empty")
-      Files.createParentDirs(emptyFile)
-      Files.touch(emptyFile)
-      // This shouldn't throw anything.
-      df.write.format("parquet").mode(SaveMode.Ignore).save(path)
-      // This should only complain that the destination directory already 
exists, rather than file
-      // "empty" is not a Parquet file.
-      assert {
-        intercept[AnalysisException] {
-          df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
-        }.getMessage.contains("already exists")
-      }
-      // This shouldn't throw anything.
-      df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
-      checkAnswer(read.format("parquet").load(path), df)
-    }
-  }
-  test("SPARK-8079: Avoid NPE thrown from BaseWriterContainer.abortJob") {
-    withTempPath { dir =>
-      intercept[AnalysisException] {
-        // Parquet doesn't allow field names with spaces.  Here we are 
intentionally making an
-        // exception thrown from the `ParquetRelation2.prepareForWriteJob()` 
method to trigger
-        // the bug.  Please refer to spark-8079 for more details.
-        range(1, 10)
-          .withColumnRenamed("id", "a b")
-          .write
-          .format("parquet")
-          .save(dir.getCanonicalPath)
-      }
-    }
-  }
-  test("SPARK-8604: Parquet data source should write summary file while doing 
appending") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      val df = sqlContext.range(0, 5)
-      df.write.mode(SaveMode.Overwrite).parquet(path)
-      val summaryPath = new Path(path, "_metadata")
-      val commonSummaryPath = new Path(path, "_common_metadata")
-      val fs = summaryPath.getFileSystem(configuration)
-      fs.delete(summaryPath, true)
-      fs.delete(commonSummaryPath, true)
-      df.write.mode(SaveMode.Append).parquet(path)
-      checkAnswer(, df.unionAll(df))
-      assert(fs.exists(summaryPath))
-      assert(fs.exists(commonSummaryPath))
-    }
-  }

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to