wengh commented on code in PR #49961:
URL: https://github.com/apache/spark/pull/49961#discussion_r1985529476
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScanBuilder.scala:
##########
@@ -25,6 +27,40 @@ class PythonScanBuilder(
ds: PythonDataSourceV2,
shortName: String,
outputSchema: StructType,
- options: CaseInsensitiveStringMap) extends ScanBuilder {
- override def build(): Scan = new PythonScan(ds, shortName, outputSchema,
options)
+ options: CaseInsensitiveStringMap)
+ extends ScanBuilder
+ with SupportsPushDownFilters {
+ private var supportedFilters: Array[Filter] = Array.empty
+
+ private def metadata: Map[String, String] = {
+ Map(
+ "PushedFilters" -> supportedFilters.mkString("[", ", ", "]"),
+ "ReadSchema" -> outputSchema.simpleString
+ )
+ }
+
+ override def build(): Scan = new PythonScan(ds, shortName, outputSchema,
options, metadata)
+
+ // Optionally called by DSv2 once to push down filters before the scan is
built.
+ override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+ if (!SQLConf.get.pythonFilterPushDown) {
Review Comment:
We'd like to avoid the new code path (serializing filters, running new
python worker, ...) for existing Python data sources that don't implement
pushdown. So in case there's a crash or a performance issue in the new code
path, its impact is limited.
However we currently don't have a good way to detect whether user has
implemented `pushFilters()` in Python `DataSourceReader` before
`ScanBuilder.pushFilters()` is called. This is because we don't know whether
it's a streaming read or a batch read at this point (the optimizer knows but
the data source doesn't get this info) so it's not safe to call Python
`DataSource.reader()` to get the batch reader instance.
So we instead add a conf to turn off the new code path. But also if the user
imeplements `pushFilters()` and this conf is disabled then we throw an error to
let the user know that they must turn on the conf to enable filter pushdown.
In the future if we figure out how to check whether the Python reader
imeplements `pushFilters` then we can set this conf to enabled by default and
deprecate it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]