Repository: spark Updated Branches: refs/heads/master 2949a835f -> d36539741
[SPARK-24626][SQL] Improve location size calculation in Analyze Table command ## What changes were proposed in this pull request? Currently, Analyze table calculates table size sequentially for each partition. We can parallelize size calculations over partitions. Results : Tested on a table with 100 partitions and data stored in S3. With changes : - 10.429s - 10.557s - 10.439s - 9.893s⨠Without changes : - 110.034s - 99.510s - 100.743s - 99.106s ## How was this patch tested? Simple unit test. Closes #21608 from Achuth17/improveAnalyze. Lead-authored-by: Achuth17 <achuth.nara...@gmail.com> Co-authored-by: arajagopal17 <arajago...@qubole.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3653974 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3653974 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3653974 Branch: refs/heads/master Commit: d36539741ff6a12a6acde9274e9992a66cdd36e7 Parents: 2949a83 Author: Achuth17 <achuth.nara...@gmail.com> Authored: Thu Aug 9 08:29:24 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu Aug 9 08:29:24 2018 -0700 ---------------------------------------------------------------------- docs/sql-programming-guide.md | 2 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++ .../command/AnalyzeColumnCommand.scala | 2 +- .../execution/command/AnalyzeTableCommand.scala | 2 +- .../sql/execution/command/CommandUtils.scala | 30 +++++++++++++++----- .../execution/datasources/DataSourceUtils.scala | 10 +++++++ .../datasources/InMemoryFileIndex.scala | 2 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 23 ++++++++++++++- 8 files changed, 72 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/docs/sql-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index a1e019c..9adb86a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1892,6 +1892,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. + - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 67c3abb..979a554 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1476,6 +1476,15 @@ object SQLConf { "are performed before any UNION, EXCEPT and MINUS operations.") .booleanConf .createWithDefault(false) + + val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = + buildConf("spark.sql.parallelFileListingInStatsComputation.enabled") + .internal() + .doc("When true, SQL commands use parallel file listing, " + + "as opposed to single thread listing." + + "This usually speeds up commands that need to list many directories.") + .booleanConf + .createWithDefault(true) } /** @@ -1873,6 +1882,9 @@ class SQLConf extends Serializable with Logging { def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED) + def parallelFileListingInStatsComputation: Boolean = + getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 640e013..3fea6d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -47,7 +47,7 @@ case class AnalyzeColumnCommand( if (tableMeta.tableType == CatalogTableType.VIEW) { throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta) + val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) // Compute stats for each column val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames) http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 58b53e8..3076e91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -39,7 +39,7 @@ case class AnalyzeTableCommand( } // Compute stats for the whole table - val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) + val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta) val newRowCount = if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count())) http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index c270486..df71bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -21,12 +21,13 @@ import java.net.URI import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} import org.apache.spark.sql.internal.SessionState @@ -38,7 +39,7 @@ object CommandUtils extends Logging { val catalog = sparkSession.sessionState.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) + val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) val newStats = CatalogStatistics(sizeInBytes = newSize) catalog.alterTableStats(table.identifier, Some(newStats)) } else { @@ -47,15 +48,29 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => - calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + if (spark.sessionState.conf.parallelFileListingInStatsComputation) { + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { + override def accept(path: Path): Boolean = { + DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir) + } + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( + paths, sessionState.newHadoopConf(), pathFilter, spark) + fileStatusSeq.flatMap(_._2.map(_.getLen)).sum + } else { + partitions.map { p => + calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) + }.sum + } } } @@ -78,7 +93,8 @@ object CommandUtils extends Logging { val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => - if (!status.getPath.getName.startsWith(stagingDir)) { + if (!status.getPath.getName.startsWith(stagingDir) && + DataSourceUtils.isDataPath(path)) { getPathSize(fs, status.getPath) } else { 0L http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index cccd6c0..90cec5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types._ @@ -49,4 +51,12 @@ object DataSourceUtils { } } } + + // SPARK-24626: Metadata files and temporary files should not be + // counted as data files, so that they shouldn't participate in tasks like + // location size calculation. + private[sql] def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") || name.startsWith(".")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 9d9f8bd..dc5c2ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging { * * @return for each input path, the set of discovered files for the path */ - private def bulkListLeafFiles( + private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, http://git-wip-us.apache.org/repos/asf/spark/blob/d3653974/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 61cec82..d8ffb29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -25,13 +25,14 @@ import scala.util.matching.Regex import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer} import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils} -import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.command.{CommandUtils, DDLUtils} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.HiveExternalCatalog._ @@ -148,6 +149,26 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("SPARK-24626 parallel file listing in Stats computation") { + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2", + SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION.key -> "True") { + val checkSizeTable = "checkSizeTable" + withTable(checkSizeTable) { + sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-01') SELECT * FROM src") + sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-02') SELECT * FROM src") + sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-03') SELECT * FROM src") + val tableMeta = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(checkSizeTable)) + HiveCatalogMetrics.reset() + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + val size = CommandUtils.calculateTotalSize(spark, tableMeta) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) + assert(size === BigInt(17436)) + } + } + } + test("analyze non hive compatible datasource tables") { val table = "parquet_tab" withTable(table) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org