This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new dcd37f9 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" dcd37f9 is described below commit dcd37f963906fd57a706ea25cb5893be2559d788 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Tue Jul 27 19:11:42 2021 +0900 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" This reverts commit 634f96dde40639df5a2ef246884bedbd48b3dc69. Closes #33533 from viirya/revert-SPARK-36136. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 22ac98dcbf48575af7912dab2583e38a2a1b751d) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../PruneFileSourcePartitionsSuite.scala | 61 ++++++++++++---------- .../execution/PruneHiveTablePartitionsSuite.scala | 9 +--- .../hive/execution}/PrunePartitionSuiteBase.scala | 17 +++--- 3 files changed, 41 insertions(+), 46 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala similarity index 80% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 510281a..a669b80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.hive.execution import org.scalatest.matchers.should.Matchers._ @@ -24,19 +24,18 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType -class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with SharedSparkSession { +class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase { override def format: String = "parquet" @@ -46,27 +45,35 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared test("PruneFileSourcePartitions should not change the output of LogicalRelation") { withTable("test") { - spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test") - val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) - - val dataSchema = StructType(tableMeta.schema.filterNot { f => - tableMeta.partitionColumnNames.contains(f.name) - }) - val relation = HadoopFsRelation( - location = catalogFileIndex, - partitionSchema = tableMeta.partitionSchema, - dataSchema = dataSchema, - bucketSpec = None, - fileFormat = new ParquetFileFormat(), - options = Map.empty)(sparkSession = spark) - - val logicalRelation = LogicalRelation(relation, tableMeta) - val query = Project(Seq(Symbol("id"), Symbol("p")), - Filter(Symbol("p") === 1, logicalRelation)).analyze - - val optimized = Optimize.execute(query) - assert(optimized.missingInput.isEmpty) + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet + |LOCATION '${dir.toURI}'""".stripMargin) + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = catalogFileIndex, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, tableMeta) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") === 1, logicalRelation)).analyze + + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) + } } } @@ -135,10 +142,6 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared } } - protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = { - case scan: FileSourceScanExec => scan.partitionFilters - } - override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { case p: FileSourceScanExec => p.selectedPartitions.length diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index df3acab..677b250 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -18,16 +18,13 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.PrunePartitionSuiteBase -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiveSingleton { +class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase { override def format(): String = "hive" @@ -134,10 +131,6 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiv } } - protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = { - case scan: HiveTableScanExec => scan.partitionPruningPred - } - override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { case p: HiveTableScanExec => p diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala similarity index 90% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala index 9909996..2a690a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.hive.execution import org.apache.spark.sql.StatisticsCollectionTestBase import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryOperator, Expression, IsNotNull, Literal} -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED -abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase { +abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with TestHiveSingleton { protected def format: String @@ -94,11 +95,11 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase { val plan = qe.sparkPlan assert(getScanExecPartitionSize(plan) == expectedPartitionCount) - val collectFn: PartialFunction[SparkPlan, Seq[Expression]] = collectPartitionFiltersFn orElse { + val pushedDownPartitionFilters = plan.collectFirst { + case scan: FileSourceScanExec => scan.partitionFilters + case scan: HiveTableScanExec => scan.partitionPruningPred case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters - } - val pushedDownPartitionFilters = plan.collectFirst(collectFn) - .map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull])) + }.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull])) val pushedFilters = pushedDownPartitionFilters.map(filters => { filters.foldLeft("")((currentStr, exp) => { if (currentStr == "") { @@ -112,7 +113,5 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase { assert(pushedFilters == Some(expectedPushedDownFilters)) } - protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] - protected def getScanExecPartitionSize(plan: SparkPlan): Long } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org