Repository: spark
Updated Branches:
  refs/heads/master c983267b0 -> 3356b8b6a


[SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved 
files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() 
API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207)
 is performing a unnecessary full filesystem scan for the saved files. The 
save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

The related PR: https://github.com/apache/spark/pull/16090

### How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #16481 from gatorsmile/saveFileScan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3356b8b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3356b8b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3356b8b6

Branch: refs/heads/master
Commit: 3356b8b6a9184fcab8d0fe993f3545c3beaa4d99
Parents: c983267
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Fri Jan 13 13:05:53 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Jan 13 13:05:53 2017 +0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |   2 +-
 .../sql/execution/datasources/DataSource.scala  | 172 +++++++++++--------
 .../hive/PartitionedTablePerfStatsSuite.scala   |  29 +---
 3 files changed, 106 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 73b2153..90aeebd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = if (tableExists) Some(table) else None)
 
     try {
-      dataSource.write(mode, Dataset.ofRows(session, query))
+      dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table 
${table.identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b7f3559..29afe57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -413,10 +413,85 @@ case class DataSource(
     relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
-  def write(
-      mode: SaveMode,
-      data: DataFrame): BaseRelation = {
+  /**
+   * Writes the given [[DataFrame]] out in this [[FileFormat]].
+   */
+  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
+    // Don't glob path for the write path.  The contracts here are:
+    //  1. Only one output path can be specified on the write path;
+    //  2. Output path must be a legal HDFS style file system path;
+    //  3. It's OK that the output path doesn't exist yet;
+    val allPaths = paths ++ caseInsensitiveOptions.get("path")
+    val outputPath = if (allPaths.length == 1) {
+      val path = new Path(allPaths.head)
+      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    } else {
+      throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
+        s"got: ${allPaths.mkString(", ")}")
+    }
+
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, 
caseSensitive)
+
+    // If we are appending to a table that already exists, make sure the 
partitioning matches
+    // up.  If we fail to load the table for whatever reason, ignore the check.
+    if (mode == SaveMode.Append) {
+      val existingPartitionColumns = Try {
+        getOrInferFileFormatSchema(format, justPartitioning = 
true)._2.fieldNames.toList
+      }.getOrElse(Seq.empty[String])
+      // TODO: Case sensitivity.
+      val sameColumns =
+        existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
+      if (existingPartitionColumns.nonEmpty && !sameColumns) {
+        throw new AnalysisException(
+          s"""Requested partitioning does not match existing partitioning.
+             |Existing partitioning columns:
+             |  ${existingPartitionColumns.mkString(", ")}
+             |Requested partitioning columns:
+             |  ${partitionColumns.mkString(", ")}
+             |""".stripMargin)
+      }
+    }
+
+    // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+    // not need to have the query as child, to avoid to analyze an optimized 
query,
+    // because InsertIntoHadoopFsRelationCommand will be optimized first.
+    val partitionAttributes = partitionColumns.map { name =>
+      val plan = data.logicalPlan
+      plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
+      }.asInstanceOf[Attribute]
+    }
+    val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
+      sparkSession.table(tableIdent).queryExecution.analyzed.collect {
+        case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
+      }.head
+    }
+    // For partitioned relation r, r.schema's column ordering can be different 
from the column
+    // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
+    // will be adjusted within InsertIntoHadoopFsRelation.
+    val plan =
+      InsertIntoHadoopFsRelationCommand(
+        outputPath = outputPath,
+        staticPartitions = Map.empty,
+        partitionColumns = partitionAttributes,
+        bucketSpec = bucketSpec,
+        fileFormat = format,
+        options = options,
+        query = data.logicalPlan,
+        mode = mode,
+        catalogTable = catalogTable,
+        fileIndex = fileIndex)
+      sparkSession.sessionState.executePlan(plan).toRdd
+  }
+
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a 
[[BaseRelation]] for
+   * the following reading.
+   */
+  def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
     if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
     }
@@ -425,78 +500,27 @@ case class DataSource(
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
       case format: FileFormat =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val allPaths = paths ++ caseInsensitiveOptions.get("path")
-        val outputPath = if (allPaths.length == 1) {
-          val path = new Path(allPaths.head)
-          val fs = 
path.getFileSystem(sparkSession.sessionState.newHadoopConf())
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        } else {
-          throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
-            s"got: ${allPaths.mkString(", ")}")
-        }
-
-        val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumn(
-          data.schema, partitionColumns, caseSensitive)
-
-        // If we are appending to a table that already exists, make sure the 
partitioning matches
-        // up.  If we fail to load the table for whatever reason, ignore the 
check.
-        if (mode == SaveMode.Append) {
-          val existingPartitionColumns = Try {
-            getOrInferFileFormatSchema(format, justPartitioning = 
true)._2.fieldNames.toList
-          }.getOrElse(Seq.empty[String])
-          // TODO: Case sensitivity.
-          val sameColumns =
-            existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
-          if (existingPartitionColumns.nonEmpty && !sameColumns) {
-            throw new AnalysisException(
-              s"""Requested partitioning does not match existing partitioning.
-                 |Existing partitioning columns:
-                 |  ${existingPartitionColumns.mkString(", ")}
-                 |Requested partitioning columns:
-                 |  ${partitionColumns.mkString(", ")}
-                 |""".stripMargin)
-          }
-        }
-
-        // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
-        // not need to have the query as child, to avoid to analyze an 
optimized query,
-        // because InsertIntoHadoopFsRelationCommand will be optimized first.
-        val partitionAttributes = partitionColumns.map { name =>
-          val plan = data.logicalPlan
-          plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
-            throw new AnalysisException(
-              s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
-          }.asInstanceOf[Attribute]
-        }
-        val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
-          sparkSession.table(tableIdent).queryExecution.analyzed.collect {
-            case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
-          }.head
-        }
-        // For partitioned relation r, r.schema's column ordering can be 
different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        val plan =
-          InsertIntoHadoopFsRelationCommand(
-            outputPath = outputPath,
-            staticPartitions = Map.empty,
-            partitionColumns = partitionAttributes,
-            bucketSpec = bucketSpec,
-            fileFormat = format,
-            options = options,
-            query = data.logicalPlan,
-            mode = mode,
-            catalogTable = catalogTable,
-            fileIndex = fileIndex)
-        sparkSession.sessionState.executePlan(plan).toRdd
-        // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring it.
+        writeInFileFormat(format, mode, data)
+        // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring
         copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+      case _ =>
+        sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
+    }
+  }
 
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   */
+  def write(mode: SaveMode, data: DataFrame): Unit = {
+    if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into 
external storage.")
+    }
+
+    providingClass.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
+      case format: FileFormat =>
+        writeInFileFormat(format, mode, data)
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 70750c4..b792a16 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedHiveTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit 
= {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit 
= {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as 
partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
-
     spark.sql(s"""
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedDatasourceTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit 
= {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit 
= {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as 
partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
-
     spark.sql(s"""
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
     withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedDatasourceTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = 
false)
+          HiveCatalogMetrics.reset()
+          setupPartitionedDatasourceTable("test", dir, scale = 10, repair = 
false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
         }
@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
       withTable("test") {
         withTempDir { dir =>
           HiveCatalogMetrics.reset()
-          setupPartitionedHiveTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = 
false)
+          setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
         }
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
           })
           executorPool.shutdown()
           executorPool.awaitTermination(30, TimeUnit.SECONDS)
-          // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED 
and
-          // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock 
take effect,
-          // only one thread can really do the build, so the listing job count 
is 2, the other
-          // one is cache.load func. Also METRIC_FILES_DISCOVERED is 
$partition_num * 2
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
-          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to