Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/9468#discussion_r44105125 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala --- @@ -70,43 +76,297 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } } - private val writer = testDF.write.option("dataSchema", dataSchema.json).format(dataSourceName) - private val reader = sqlContext.read.option("dataSchema", dataSchema.json).format(dataSourceName) - - test("unhandledFilters") { - withTempPath { dir => - - val path = dir.getCanonicalPath - writer.save(s"$path/p=0") - writer.save(s"$path/p=1") - - val isOdd = udf((_: Int) % 2 == 1) - val df = reader.load(path) - .filter( - // This filter is inconvertible - isOdd('a) && - // This filter is convertible but unhandled - 'a > 1 && - // This filter is convertible and handled - 'b > "val_1" && - // This filter references a partiiton column, won't be pushed down - 'p === 1 - ).select('a, 'p) - val rawScan = df.queryExecution.executedPlan collect { + private var tempPath: File = _ + + private var partitionedDF: DataFrame = _ + + private val partitionedDataSchema: StructType = StructType('a.int :: 'b.int :: 'c.string :: Nil) + + protected override def beforeAll(): Unit = { + this.tempPath = Utils.createTempDir() + + val df = sqlContext.range(10).select( + 'id cast IntegerType as 'a, + ('id cast IntegerType) * 2 as 'b, + concat(lit("val_"), 'id) as 'c + ) + + partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=0") + partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=1") + + partitionedDF = partitionedReader.load(tempPath.getCanonicalPath) + } + + override protected def afterAll(): Unit = { + Utils.deleteRecursively(tempPath) + } + + private def partitionedWriter(df: DataFrame) = + df.write.option("dataSchema", partitionedDataSchema.json).format(dataSourceName) + + private def partitionedReader = + sqlContext.read.option("dataSchema", partitionedDataSchema.json).format(dataSourceName) + + /** + * Constructs test cases that test column pruning and filter push-down. + * + * For filter push-down, the following filters are not pushed-down. + * + * 1. Partitioning filters don't participate filter push-down, they are handled separately in + * `DataSourceStrategy` + * + * 2. Catalyst filter `Expression`s that cannot be converted to data source `Filter`s are not + * pushed down (e.g. UDF and filters referencing multiple columns). + * + * 3. Catalyst filter `Expression`s that can be converted to data source `Filter`s but cannot be + * handled by the underlying data source are not pushed down (e.g. returned from + * `BaseRelation.unhandledFilters()`). + * + * Note that for [[SimpleTextRelation]], all data source [[Filter]]s other than [[GreaterThan]] + * are unhandled. We made this assumption in [[SimpleTextRelation.unhandledFilters()]] only + * for testing purposes. + * + * @param projections Projection list of the query + * @param filter Filter condition of the query + * @param requiredColumns Expected names of required columns + * @param pushedFilters Expected data source [[Filter]]s that are pushed down + * @param inconvertibleFilters Expected Catalyst filter [[Expression]]s that cannot be converted + * to data source [[Filter]]s + * @param unhandledFilters Expected Catalyst flter [[Expression]]s that can be converted to data + * source [[Filter]]s but cannot be handled by the data source relation + * @param partitioningFilters Expected Catalyst filter [[Expression]]s that reference partition + * columns + * @param expectedRawScanAnswer Expected query result of the raw table scan returned by the data + * source relation + * @param expectedAnswer Expected query result of the full query + */ + def testPruningAndFiltering( + projections: Seq[Column], + filter: Column, + requiredColumns: Seq[String], + pushedFilters: Seq[Filter], + inconvertibleFilters: Seq[Column], + unhandledFilters: Seq[Column], + partitioningFilters: Seq[Column])( + expectedRawScanAnswer: => Seq[Row])( + expectedAnswer: => Seq[Row]): Unit = { + test(s"pruning and filtering: df.select(${projections.mkString(", ")}).where($filter)") { + val df = partitionedDF.where(filter).select(projections: _*) + val queryExecution = df.queryExecution + val executedPlan = queryExecution.executedPlan + + val rawScan = executedPlan.collect { case p: PhysicalRDD => p } match { - case Seq(p) => p + case Seq(scan) => scan + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") } - val outputSchema = new StructType().add("a", IntegerType).add("p", IntegerType) + markup("Checking raw scan answer") + checkAnswer( + DataFrame(sqlContext, LogicalRDD(rawScan.output, rawScan.rdd)(sqlContext)), + expectedRawScanAnswer) + + markup("Checking full query answer") + checkAnswer(df, expectedAnswer) + + markup("Checking required columns") + assert(requiredColumns === SimpleTextRelation.requiredColumns) + + val nonPushedFilters = { + val boundFilters = executedPlan.collect { + case f: execution.Filter => f + } match { + case Nil => Nil + case Seq(f) => splitConjunctivePredicates(f.condition) + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } - assertResult(Set((2, 1), (3, 1))) { - rawScan.execute().collect() - .map { CatalystTypeConverters.convertToScala(_, outputSchema) } - .map { case Row(a, p) => (a, p) }.toSet + // Unbound these bound filters so that we can easily compare them with expected results. + boundFilters.map { + _.transform { case a: AttributeReference => UnresolvedAttribute(a.name) } + }.toSet } - checkAnswer(df, Row(3, 1)) + markup("Checking pushed filters") + assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet) + + val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet + val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet + val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet + + markup("Checking unhandled, inconvertible, and partitioning filters") + assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters) + // Partitioning filters are handled separately and don't participate filter push-down. So they + // shouldn't be part of non-pushed filters. + assert(expectedPartitioningFilters.intersect(nonPushedFilters).isEmpty) } } + + testPruningAndFiltering( + projections = Seq('*), + filter = 'p > 0, + requiredColumns = Seq("a", "b", "c"), + pushedFilters = Nil, + inconvertibleFilters = Nil, + unhandledFilters = Nil, + partitioningFilters = Nil + ) { + Seq( + Row(0, 0, "val_0", 1), + Row(1, 2, "val_1", 1), + Row(2, 4, "val_2", 1), + Row(3, 6, "val_3", 1), + Row(4, 8, "val_4", 1), + Row(5, 10, "val_5", 1), + Row(6, 12, "val_6", 1), + Row(7, 14, "val_7", 1), + Row(8, 16, "val_8", 1), + Row(9, 18, "val_9", 1)) + } { + Seq( + Row(0, 0, "val_0", 1), + Row(1, 2, "val_1", 1), + Row(2, 4, "val_2", 1), + Row(3, 6, "val_3", 1), + Row(4, 8, "val_4", 1), + Row(5, 10, "val_5", 1), + Row(6, 12, "val_6", 1), + Row(7, 14, "val_7", 1), + Row(8, 16, "val_8", 1), + Row(9, 18, "val_9", 1)) + } + + testPruningAndFiltering( + projections = Seq('c, 'p), + filter = 'a < 3 && 'p > 0, + requiredColumns = Seq("c", "a"), + pushedFilters = Nil, + inconvertibleFilters = Nil, + unhandledFilters = Seq('a < 3), + partitioningFilters = Nil --- End diff -- Should we have `'p > 0`?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org