This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 485ae6d [SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables 485ae6d is described below commit 485ae6d1818e8756a86da38d6aefc8f1dbde49c2 Author: shahid <shahidk...@gmail.com> AuthorDate: Sun Jul 28 15:35:01 2019 -0700 [SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables In case of CatalogFileIndex datasource table, sizeInBytes is always coming as default size in bytes, which is 8.0EB (Even when the user give fallBackToHdfsForStatsEnabled=true) . So, the datasource table which has CatalogFileIndex, always prefer SortMergeJoin, instead of BroadcastJoin, even though the size is below broadcast join threshold. In this PR, In case of CatalogFileIndex table, if we enable "fallBackToHdfsForStatsEnabled=true", then the computeStatistics get the sizeInBytes from the hdfs and we get the actual size of the table. Hence, during join operation, when the table size is below broadcast threshold, it will prefer broadCastHashJoin instead of SortMergeJoin. Added UT Closes #22502 from shahidki31/SPARK-25474. Authored-by: shahid <shahidk...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/execution/command/CommandUtils.scala | 11 ++++++ .../execution/datasources/HadoopFsRelation.scala | 13 ++++--- .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++------ .../apache/spark/sql/hive/StatisticsSuite.scala | 40 ++++++++++++++++++++++ 4 files changed, 63 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index b644e6d..9a9d66b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -344,4 +344,15 @@ object CommandUtils extends Logging { private def isDataPath(path: Path, stagingDir: String): Boolean = { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } + + def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { + try { + val hadoopConf = session.sessionState.newHadoopConf() + path.getFileSystem(hadoopConf).getContentSummary(path).getLength + } catch { + case NonFatal(e) => + logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) + defaultSize + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index d278802..f7d2315 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.execution.datasources -import java.util.Locale - -import scala.collection.mutable +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.types.{StructField, StructType} @@ -71,7 +70,13 @@ case class HadoopFsRelation( override def sizeInBytes: Long = { val compressionFactor = sqlContext.conf.fileCompressionFactor - (location.sizeInBytes * compressionFactor).toLong + val defaultSize = (location.sizeInBytes * compressionFactor).toLong + location match { + case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => + CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location), + defaultSize) + case _ => defaultSize + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 7b28e4f..d09c0ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -118,16 +118,8 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - session.sessionState.conf.defaultSizeInBytes - } + CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location), + session.sessionState.conf.defaultSizeInBytes) } else { session.sessionState.conf.defaultSizeInBytes } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b4e5058..e20099a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1484,4 +1484,44 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } + + test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") { + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") + sql("INSERT INTO t1 VALUES (1, 'a')") + checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB") + sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") + sql("INSERT INTO t2 VALUES (1, 'a')") + checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"), + "BroadcastHashJoin") + } + } + } + + test("SPARK-25474: should not fall back to hdfs when table statistics exists" + + " for CatalogFileIndex dataSourceTable") { + + var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L + Seq(true, false).foreach { fallBackToHdfs => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) { + withTable("t1") { + sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") + sql("INSERT INTO t1 VALUES (1, 'a')") + // Analyze command updates the statistics of table `t1` + sql("ANALYZE TABLE t1 COMPUTE STATISTICS") + val catalogTable = getCatalogTable("t1") + assert(catalogTable.stats.isDefined) + + if (!fallBackToHdfs) { + sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong + } else { + sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong + } + checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB") + } + } + } + assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org