Repository: spark
Updated Branches:
  refs/heads/branch-1.4 815e05654 -> cbaf59544


[SPARK-8014] [SQL] Avoid premature metadata discovery when writing a 
HadoopFsRelation with a save mode other than Append

The current code references the schema of the DataFrame to be written before 
checking save mode. This triggers expensive metadata discovery prematurely. For 
save mode other than `Append`, this metadata discovery is useless since we 
either ignore the result (for `Ignore` and `ErrorIfExists`) or delete existing 
files (for `Overwrite`) later.

This PR fixes this issue by deferring metadata discovery after save mode 
checking.

Author: Cheng Lian <l...@databricks.com>

Closes #6583 from liancheng/spark-8014 and squashes the following commits:

1aafabd [Cheng Lian] Updates comments
088abaa [Cheng Lian] Avoids schema merging and partition discovery when data 
schema and partition schema are defined
8fbd93f [Cheng Lian] Fixes SPARK-8014

(cherry picked from commit 686a45f0b9c50ede2a80854ed6a155ee8a9a4f5c)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbaf5954
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbaf5954
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbaf5954

Branch: refs/heads/branch-1.4
Commit: cbaf595447ae42227516f9220f6a0ed2d9fec54f
Parents: 815e056
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Jun 2 13:32:13 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Jun 2 13:32:34 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   |  2 +-
 .../org/apache/spark/sql/sources/commands.scala | 20 +++++--
 .../org/apache/spark/sql/sources/ddl.scala      | 16 ++----
 .../apache/spark/sql/sources/interfaces.scala   |  2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 59 +++++++++++++++-----
 5 files changed, 67 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cbaf5954/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e439a18..824ae36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -190,7 +190,7 @@ private[sql] class ParquetRelation2(
     }
   }
 
-  override def dataSchema: StructType = metadataCache.dataSchema
+  override def dataSchema: StructType = 
maybeDataSchema.getOrElse(metadataCache.dataSchema)
 
   override private[sql] def refresh(): Unit = {
     super.refresh()

http://git-wip-us.apache.org/repos/asf/spark/blob/cbaf5954/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3132067..71f016b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -30,9 +30,10 @@ import org.apache.spark._
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
@@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
       // We create a DataFrame by applying the schema of relation to the data 
to make sure.
       // We are writing data based on the expected schema,
-      val df = sqlContext.createDataFrame(
-        DataFrame(sqlContext, query).queryExecution.toRdd,
-        relation.schema,
-        needsConversion = false)
+      val df = {
+        // For partitioned relation r, r.schema's column ordering can be 
different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after 
data column). We
+        // need a Project to adjust the ordering, so that inside 
InsertIntoHadoopFsRelation, we can
+        // safely apply the schema of r.schema to the data.
+        val project = Project(
+          relation.schema.map(field => new 
UnresolvedAttribute(Seq(field.name))), query)
+
+        sqlContext.createDataFrame(
+          DataFrame(sqlContext, project).queryExecution.toRdd,
+          relation.schema,
+          needsConversion = false)
+      }
 
       val partitionColumns = relation.partitionColumns.fieldNames
       if (partitionColumns.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cbaf5954/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 22587f5..20afd60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Row}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.RunnableCommand
@@ -322,19 +322,13 @@ private[sql] object ResolvedDataSource {
           Some(partitionColumnsSchema(data.schema, partitionColumns)),
           caseInsensitiveOptions)
 
-        // For partitioned relation r, r.schema's column ordering is different 
with the column
-        // ordering of data.logicalPlan. We need a Project to adjust the 
ordering.
-        // So, inside InsertIntoHadoopFsRelation, we can safely apply the 
schema of r.schema to
-        // the data.
-        val project =
-          Project(
-            r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
-            data.logicalPlan)
-
+        // For partitioned relation r, r.schema's column ordering can be 
different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
+        // will be adjusted within InsertIntoHadoopFsRelation.
         sqlContext.executePlan(
           InsertIntoHadoopFsRelation(
             r,
-            project,
+            data.logicalPlan,
             mode)).toRdd
         r
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cbaf5954/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c4ffa8d..f5bd2d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -503,7 +503,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
    */
   override lazy val schema: StructType = {
     val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
-    StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column 
=>
+    StructType(dataSchema ++ partitionColumns.filterNot { column =>
       dataSchemaColumnNames.contains(column.name.toLowerCase)
     })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cbaf5954/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index cf5ae88..501e179 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.sources
 
+import java.io.File
+
+import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 import org.scalatest.FunSuite
 
@@ -454,6 +457,20 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-7616: adjust column name order accordingly when saving 
partitioned table") {
+    val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
+
+    df.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .partitionBy("c", "a")
+      .saveAsTable("t")
+
+    withTable("t") {
+      checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
+    }
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -535,20 +552,6 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
     }
   }
 
-  test("SPARK-7616: adjust column name order accordingly when saving 
partitioned table") {
-    val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
-
-    df.write
-      .format("parquet")
-      .mode(SaveMode.Overwrite)
-      .partitionBy("c", "a")
-      .saveAsTable("t")
-
-    withTable("t") {
-      checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
-    }
-  }
-
   test("SPARK-7868: _temporary directories should be ignored") {
     withTempPath { dir =>
       val df = Seq("a", "b", "c").zipWithIndex.toDF()
@@ -564,4 +567,32 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       checkAnswer(read.format("parquet").load(dir.getCanonicalPath), 
df.collect())
     }
   }
+
+  test("SPARK-8014: Avoid scanning output directory when SaveMode isn't 
SaveMode.Append") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      val df = Seq(1 -> "a").toDF()
+
+      // Creates an arbitrary file.  If this directory gets scanned, 
ParquetRelation2 will throw
+      // since it's not a valid Parquet file.
+      val emptyFile = new File(path, "empty")
+      Files.createParentDirs(emptyFile)
+      Files.touch(emptyFile)
+
+      // This shouldn't throw anything.
+      df.write.format("parquet").mode(SaveMode.Ignore).save(path)
+
+      // This should only complain that the destination directory already 
exists, rather than file
+      // "empty" is not a Parquet file.
+      assert {
+        intercept[RuntimeException] {
+          df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
+        }.getMessage.contains("already exists")
+      }
+
+      // This shouldn't throw anything.
+      df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
+      checkAnswer(read.format("parquet").load(path), df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to