Repository: spark
Updated Branches:
  refs/heads/branch-2.1 db37049da -> bf2f233e4


[SPARK-19092][SQL][BACKPORT-2.1] Save() API of DataFrameWriter should not scan 
all the saved files #16481

### What changes were proposed in this pull request?

#### This PR is to backport https://github.com/apache/spark/pull/16481 to Spark 
2.1
---
`DataFrameWriter`'s [save() 
API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207)
 is performing a unnecessary full filesystem scan for the saved files. The 
save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

### How was this patch tested?
Added and modified the test cases

Author: gatorsmile <gatorsm...@gmail.com>

Closes #16588 from gatorsmile/backport-19092.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf2f233e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf2f233e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf2f233e

Branch: refs/heads/branch-2.1
Commit: bf2f233e49013da54a6accd96c471acafc24df15
Parents: db37049
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Jan 16 10:58:10 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jan 16 10:58:10 2017 +0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |   2 +-
 .../sql/execution/datasources/DataSource.scala  | 163 +++++++++++--------
 .../hive/PartitionedTablePerfStatsSuite.scala   |  29 +---
 3 files changed, 102 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf2f233e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 193a2a2..630adb0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -224,7 +224,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = Some(table))
 
     val result = try {
-      dataSource.write(mode, df)
+      dataSource.writeAndRead(mode, df)
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table $tableName in $mode mode", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf2f233e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 31a491f..af70bf7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -413,10 +413,82 @@ case class DataSource(
     relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
-  def write(
-      mode: SaveMode,
-      data: DataFrame): BaseRelation = {
+  /**
+   * Writes the given [[DataFrame]] out in this [[FileFormat]].
+   */
+  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
+    // Don't glob path for the write path.  The contracts here are:
+    //  1. Only one output path can be specified on the write path;
+    //  2. Output path must be a legal HDFS style file system path;
+    //  3. It's OK that the output path doesn't exist yet;
+    val allPaths = paths ++ caseInsensitiveOptions.get("path")
+    val outputPath = if (allPaths.length == 1) {
+      val path = new Path(allPaths.head)
+      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    } else {
+      throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
+        s"got: ${allPaths.mkString(", ")}")
+    }
+
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.validatePartitionColumn(
+      data.schema, partitionColumns, caseSensitive)
+
+    // If we are appending to a table that already exists, make sure the 
partitioning matches
+    // up.  If we fail to load the table for whatever reason, ignore the check.
+    if (mode == SaveMode.Append) {
+      val existingPartitionColumns = Try {
+        getOrInferFileFormatSchema(format, justPartitioning = 
true)._2.fieldNames.toList
+      }.getOrElse(Seq.empty[String])
+      // TODO: Case sensitivity.
+      val sameColumns =
+        existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
+      if (existingPartitionColumns.nonEmpty && !sameColumns) {
+        throw new AnalysisException(
+          s"""Requested partitioning does not match existing partitioning.
+             |Existing partitioning columns:
+             |  ${existingPartitionColumns.mkString(", ")}
+             |Requested partitioning columns:
+             |  ${partitionColumns.mkString(", ")}
+             |""".stripMargin)
+      }
+    }
+
+    // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+    // not need to have the query as child, to avoid to analyze an optimized 
query,
+    // because InsertIntoHadoopFsRelationCommand will be optimized first.
+    val columns = partitionColumns.map { name =>
+      val plan = data.logicalPlan
+      plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
+      }.asInstanceOf[Attribute]
+    }
+    // For partitioned relation r, r.schema's column ordering can be different 
from the column
+    // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
+    // will be adjusted within InsertIntoHadoopFsRelation.
+    val plan =
+      InsertIntoHadoopFsRelationCommand(
+        outputPath = outputPath,
+        staticPartitionKeys = Map.empty,
+        customPartitionLocations = Map.empty,
+        partitionColumns = columns,
+        bucketSpec = bucketSpec,
+        fileFormat = format,
+        refreshFunction = _ => Unit, // No existing table needs to be 
refreshed.
+        options = options,
+        query = data.logicalPlan,
+        mode = mode,
+        catalogTable = catalogTable)
+    sparkSession.sessionState.executePlan(plan).toRdd
+  }
+
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a 
[[BaseRelation]] for
+   * the following reading.
+   */
+  def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
     if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
     }
@@ -425,74 +497,27 @@ case class DataSource(
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
       case format: FileFormat =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val allPaths = paths ++ caseInsensitiveOptions.get("path")
-        val outputPath = if (allPaths.length == 1) {
-          val path = new Path(allPaths.head)
-          val fs = 
path.getFileSystem(sparkSession.sessionState.newHadoopConf())
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        } else {
-          throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
-            s"got: ${allPaths.mkString(", ")}")
-        }
-
-        val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumn(
-          data.schema, partitionColumns, caseSensitive)
-
-        // If we are appending to a table that already exists, make sure the 
partitioning matches
-        // up.  If we fail to load the table for whatever reason, ignore the 
check.
-        if (mode == SaveMode.Append) {
-          val existingPartitionColumns = Try {
-            getOrInferFileFormatSchema(format, justPartitioning = 
true)._2.fieldNames.toList
-          }.getOrElse(Seq.empty[String])
-          // TODO: Case sensitivity.
-          val sameColumns =
-            existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
-          if (existingPartitionColumns.nonEmpty && !sameColumns) {
-            throw new AnalysisException(
-              s"""Requested partitioning does not match existing partitioning.
-                 |Existing partitioning columns:
-                 |  ${existingPartitionColumns.mkString(", ")}
-                 |Requested partitioning columns:
-                 |  ${partitionColumns.mkString(", ")}
-                 |""".stripMargin)
-          }
-        }
-
-        // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
-        // not need to have the query as child, to avoid to analyze an 
optimized query,
-        // because InsertIntoHadoopFsRelationCommand will be optimized first.
-        val columns = partitionColumns.map { name =>
-          val plan = data.logicalPlan
-          plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
-            throw new AnalysisException(
-              s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
-          }.asInstanceOf[Attribute]
-        }
-        // For partitioned relation r, r.schema's column ordering can be 
different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        val plan =
-          InsertIntoHadoopFsRelationCommand(
-            outputPath = outputPath,
-            staticPartitionKeys = Map.empty,
-            customPartitionLocations = Map.empty,
-            partitionColumns = columns,
-            bucketSpec = bucketSpec,
-            fileFormat = format,
-            refreshFunction = _ => Unit, // No existing table needs to be 
refreshed.
-            options = options,
-            query = data.logicalPlan,
-            mode = mode,
-            catalogTable = catalogTable)
-        sparkSession.sessionState.executePlan(plan).toRdd
+        writeInFileFormat(format, mode, data)
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring it.
         copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+      case _ =>
+        sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
+    }
+  }
+
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   */
+  def write(mode: SaveMode, data: DataFrame): Unit = {
+    if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into 
external storage.")
+    }
 
+    providingClass.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
+      case format: FileFormat =>
+        writeInFileFormat(format, mode, data)
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf2f233e/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 55b72c6..5bca90b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedHiveTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit 
= {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit 
= {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as 
partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
-
     spark.sql(s"""
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedDatasourceTable(
-      tableName: String, dir: File, scale: Int,
-      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit 
= {
+      tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit 
= {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as 
partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
-    if (clearMetricsBeforeCreate) {
-      HiveCatalogMetrics.reset()
-    }
-
     spark.sql(s"""
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
     withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedDatasourceTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = 
false)
+          HiveCatalogMetrics.reset()
+          setupPartitionedDatasourceTable("test", dir, scale = 10, repair = 
false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
         }
@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
       withTable("test") {
         withTempDir { dir =>
           HiveCatalogMetrics.reset()
-          setupPartitionedHiveTable(
-            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = 
false)
+          setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
           assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
         }
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
           })
           executorPool.shutdown()
           executorPool.awaitTermination(30, TimeUnit.SECONDS)
-          // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED 
and
-          // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock 
take effect,
-          // only one thread can really do the build, so the listing job count 
is 2, the other
-          // one is cache.load func. Also METRIC_FILES_DISCOVERED is 
$partition_num * 2
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
-          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to