Repository: spark
Updated Branches:
  refs/heads/master 2949a835f -> d36539741


[SPARK-24626][SQL] Improve location size calculation in Analyze Table command

## What changes were proposed in this pull request?

Currently, Analyze table calculates table size sequentially for each partition. 
We can parallelize size calculations over partitions.

Results : Tested on a table with 100 partitions and data stored in S3.
With changes :
- 10.429s
- 10.557s
- 10.439s
- 9.893s


Without changes :
- 110.034s
- 99.510s
- 100.743s
- 99.106s

## How was this patch tested?

Simple unit test.

Closes #21608 from Achuth17/improveAnalyze.

Lead-authored-by: Achuth17 <achuth.nara...@gmail.com>
Co-authored-by: arajagopal17 <arajago...@qubole.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3653974
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3653974
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3653974

Branch: refs/heads/master
Commit: d36539741ff6a12a6acde9274e9992a66cdd36e7
Parents: 2949a83
Author: Achuth17 <achuth.nara...@gmail.com>
Authored: Thu Aug 9 08:29:24 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Aug 9 08:29:24 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  2 ++
 .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++
 .../command/AnalyzeColumnCommand.scala          |  2 +-
 .../execution/command/AnalyzeTableCommand.scala |  2 +-
 .../sql/execution/command/CommandUtils.scala    | 30 +++++++++++++++-----
 .../execution/datasources/DataSourceUtils.scala | 10 +++++++
 .../datasources/InMemoryFileIndex.scala         |  2 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 23 ++++++++++++++-
 8 files changed, 72 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index a1e019c..9adb86a 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1892,6 +1892,8 @@ working with timestamps in `pandas_udf`s to get the best 
performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by default 
but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. 
This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 
'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 
2.4, Spark respects Parquet/ORC specific table properties while converting 
Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS 
PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy 
parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would 
be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for better 
performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. 
It means Spark uses its own ORC support by default instead of Hive SerDe. As an 
example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive 
SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC 
data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in parallel 
by default. This can be disabled by setting 
`spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
+  - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary 
files are not counted as data files when calculating table size during 
Statistics computation.
 
 ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 67c3abb..979a554 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1476,6 +1476,15 @@ object SQLConf {
         "are performed before any UNION, EXCEPT and MINUS operations.")
       .booleanConf
       .createWithDefault(false)
+
+  val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
+    buildConf("spark.sql.parallelFileListingInStatsComputation.enabled")
+      .internal()
+      .doc("When true, SQL commands use parallel file listing, " +
+        "as opposed to single thread listing." +
+        "This usually speeds up commands that need to list many directories.")
+      .booleanConf
+      .createWithDefault(true)
 }
 
 /**
@@ -1873,6 +1882,9 @@ class SQLConf extends Serializable with Logging {
 
   def setOpsPrecedenceEnforced: Boolean = 
getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
 
+  def parallelFileListingInStatsComputation: Boolean =
+    getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 640e013..3fea6d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -47,7 +47,7 @@ case class AnalyzeColumnCommand(
     if (tableMeta.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
-    val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta)
+    val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
 
     // Compute stats for each column
     val (rowCount, newColStats) = computeColumnStats(sparkSession, 
tableIdentWithDB, columnNames)

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 58b53e8..3076e91 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -39,7 +39,7 @@ case class AnalyzeTableCommand(
     }
 
     // Compute stats for the whole table
-    val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
+    val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
     val newRowCount =
       if (noscan) None else 
Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index c270486..df71bc9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -21,12 +21,13 @@ import java.net.URI
 
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
InMemoryFileIndex}
 import org.apache.spark.sql.internal.SessionState
 
 
@@ -38,7 +39,7 @@ object CommandUtils extends Logging {
       val catalog = sparkSession.sessionState.catalog
       if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
         val newTable = catalog.getTableMetadata(table.identifier)
-        val newSize = 
CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable)
+        val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
         val newStats = CatalogStatistics(sizeInBytes = newSize)
         catalog.alterTableStats(table.identifier, Some(newStats))
       } else {
@@ -47,15 +48,29 @@ object CommandUtils extends Logging {
     }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+    val sessionState = spark.sessionState
     if (catalogTable.partitionColumnNames.isEmpty) {
       calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
     } else {
       // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
       val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-      partitions.map { p =>
-        calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-      }.sum
+      if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
+        val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+        val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+        val pathFilter = new PathFilter with Serializable {
+          override def accept(path: Path): Boolean = {
+            DataSourceUtils.isDataPath(path) && 
!path.getName.startsWith(stagingDir)
+          }
+        }
+        val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
+          paths, sessionState.newHadoopConf(), pathFilter, spark)
+        fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
+      } else {
+        partitions.map { p =>
+          calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
+        }.sum
+      }
     }
   }
 
@@ -78,7 +93,8 @@ object CommandUtils extends Logging {
       val size = if (fileStatus.isDirectory) {
         fs.listStatus(path)
           .map { status =>
-            if (!status.getPath.getName.startsWith(stagingDir)) {
+            if (!status.getPath.getName.startsWith(stagingDir) &&
+              DataSourceUtils.isDataPath(path)) {
               getPathSize(fs, status.getPath)
             } else {
               0L

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index cccd6c0..90cec5e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.types._
 
@@ -49,4 +51,12 @@ object DataSourceUtils {
       }
     }
   }
+
+  // SPARK-24626: Metadata files and temporary files should not be
+  // counted as data files, so that they shouldn't participate in tasks like
+  // location size calculation.
+  private[sql] def isDataPath(path: Path): Boolean = {
+    val name = path.getName
+    !(name.startsWith("_") || name.startsWith("."))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 9d9f8bd..dc5c2ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging {
    *
    * @return for each input path, the set of discovered files for the path
    */
-  private def bulkListLeafFiles(
+  private[sql] def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,

http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 61cec82..d8ffb29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -25,13 +25,14 @@ import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.common.StatsSetupConst
 
+import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, 
CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, 
HistogramSerializer}
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
-import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.command.{CommandUtils, DDLUtils}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.HiveExternalCatalog._
@@ -148,6 +149,26 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
     }
   }
 
+  test("SPARK-24626 parallel file listing in Stats computation") {
+    withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2",
+      SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION.key -> "True") {
+      val checkSizeTable = "checkSizeTable"
+      withTable(checkSizeTable) {
+          sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) 
PARTITIONED BY (ds STRING)")
+          sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+          sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-02') 
SELECT * FROM src")
+          sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+          val tableMeta = spark.sessionState.catalog
+            .getTableMetadata(TableIdentifier(checkSizeTable))
+          HiveCatalogMetrics.reset()
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+          val size = CommandUtils.calculateTotalSize(spark, tableMeta)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
+          assert(size === BigInt(17436))
+      }
+    }
+  }
+
   test("analyze non hive compatible datasource tables") {
     val table = "parquet_tab"
     withTable(table) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to