ulysses-you commented on code in PR #42318: URL: https://github.com/apache/spark/pull/42318#discussion_r1283886691
########## sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala: ########## @@ -371,49 +373,47 @@ trait FileSourceScanLike extends DataSourceScanExec { } } - private def translatePushedDownFilters(dataFilters: Seq[Expression]): Seq[Filter] = { + private def translateToV1Filters( + dataFilters: Seq[Expression], + scalarSubqueryToLiteral: execution.ScalarSubquery => Literal): Seq[Filter] = { + val scalarSubqueryReplaced = dataFilters.map(_.transform { + // Replace scalar subquery to literal so that `DataSourceStrategy.translateFilter` can + // support translating it. + case scalarSubquery: execution.ScalarSubquery => scalarSubqueryToLiteral(scalarSubquery) + }) + val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) // `dataFilters` should not include any constant metadata col filters // because the metadata struct has been flatted in FileSourceStrategy // and thus metadata col filters are invalid to be pushed down. Metadata that is generated // during the scan can be used for filters. - dataFilters.filterNot(_.references.exists { + scalarSubqueryReplaced.filterNot(_.references.exists { case FileSourceConstantMetadataAttribute(_) => true case _ => false }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) } + // This field may execute subquery expressions and should not be accessed during planning. @transient - protected lazy val pushedDownFilters: Seq[Filter] = translatePushedDownFilters(dataFilters) - - @transient - protected lazy val dynamicallyPushedDownFilters: Seq[Filter] = { - if (dataFilters.exists(_.exists(_.isInstanceOf[execution.ScalarSubquery]))) { - // Replace scalar subquery to literal so that `DataSourceStrategy.translateFilter` can - // support translate it. The subquery must has been materialized since SparkPlan always - // execute subquery first. - val normalized = dataFilters.map(_.transform { - case scalarSubquery: execution.ScalarSubquery => scalarSubquery.toLiteral - }) - translatePushedDownFilters(normalized) - } else { - pushedDownFilters - } - } + protected lazy val pushedDownFilters: Seq[Filter] = translateToV1Filters(dataFilters, _.toLiteral) override lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location val locationDesc = location.getClass.getSimpleName + Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength) + // `metadata` is accessed during planning and the scalar subquery is not executed yet. Here + // we get the pretty string of the scalar subquery, for display purpose only. + val pushedFiltersForDisplay = translateToV1Filters( + dataFilters, s => Literal("ScalarSubquery#" + s.exprId.id)) val metadata = Map( "Format" -> relation.fileFormat.toString, "ReadSchema" -> requiredSchema.catalogString, "Batched" -> supportsColumnar.toString, "PartitionFilters" -> seqToString(partitionFilters), - "PushedFilters" -> seqToString(pushedDownFilters), + "PushedFilters" -> seqToString(pushedFiltersForDisplay), Review Comment: thank you for the improvement! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org