Repository: spark Updated Branches: refs/heads/master 8ddfa52c2 -> ce89ff477
[SPARK-9386] [SQL] Feature flag for metastore partition pruning Since we have been seeing a lot of failures related to this new feature, lets put it behind a flag and turn it off by default. Author: Michael Armbrust <mich...@databricks.com> Closes #7703 from marmbrus/optionalMetastorePruning and squashes the following commits: 6ad128c [Michael Armbrust] style 8447835 [Michael Armbrust] [SPARK-9386][SQL] Feature flag for metastore partition pruning fd37b87 [Michael Armbrust] add config flag Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce89ff47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce89ff47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce89ff47 Branch: refs/heads/master Commit: ce89ff477aea6def68265ed218f6105680755c9a Parents: 8ddfa52 Author: Michael Armbrust <mich...@databricks.com> Authored: Mon Jul 27 17:32:34 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Jul 27 17:32:34 2015 -0700 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 7 +++++++ .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 12 +++++++++++- .../apache/spark/sql/hive/client/ClientInterface.scala | 10 ++++------ 3 files changed, 22 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9b2dbd7..40eba33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -301,6 +301,11 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "<TODO>") + val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning", + defaultValue = Some(false), + doc = "When true, some predicates will be pushed down into the Hive metastore so that " + + "unmatching partitions can be eliminated earlier.") + val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord", defaultValue = Some("_corrupt_record"), doc = "<TODO>") @@ -456,6 +461,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) + private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9c707a7..3180c05 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -678,8 +678,18 @@ private[hive] case class MetastoreRelation } ) + // When metastore partition pruning is turned off, we cache the list of all partitions to + // mimic the behavior of Spark < 1.5 + lazy val allPartitions = table.getAllPartitions + def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - table.getPartitions(predicates).map { p => + val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { + table.getPartitions(predicates) + } else { + allPartitions + } + + rawPartitions.map { p => val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 1656587..d834b4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -72,12 +72,10 @@ private[hive] case class HiveTable( def isPartitioned: Boolean = partitionColumns.nonEmpty - def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = { - predicates match { - case Nil => client.getAllPartitions(this) - case _ => client.getPartitionsByFilter(this, predicates) - } - } + def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) + + def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = + client.getPartitionsByFilter(this, predicates) // Hive does not support backticks when passing names to the client. def qualifiedName: String = s"$database.$name" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org