Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22206#discussion_r212496291 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy { config) val filterCondition = postScanFilters.reduceLeftOption(And) - val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + + val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) { + // add a projection before FilterExec to ensure that the rows are converted to unsafe + val filterExpr = filterCondition.get + FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan)) + } else { + filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + } // always add the projection, which will produce unsafe rows required by some operators - ProjectExec(project, withFilter) :: Nil + if (project.exists(hasScalarPythonUDF)) { + val references = project.map(_.references).reduce(_ ++ _).toSeq + ProjectExec(project, ProjectExec(references, withFilter)) :: Nil --- End diff -- Ok. Let's leave as it is now.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org