[ https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15233231#comment-15233231 ]
Yan commented on SPARK-11368: ----------------------------- The issue seems to be gone with the latest master code (for 2.0): sqlCtx.sql('select count(*) from df where id >= 990 and multiply2(value) > 200000').explain(True): WholeStageCodegen : +- TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count(1)#14L]) : +- INPUT +- Exchange SinglePartition, None +- WholeStageCodegen : +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#17L]) : +- Project : +- Project [value#0L,id#1] : +- Filter (cast(pythonUDF0#18 as double) > 200000.0) : +- INPUT +- !BatchPythonEvaluation [multiply2(value#0L)], [value#0L,id#1,pythonUDF0#18] +- WholeStageCodegen : +- BatchedScan HadoopFiles[value#0L,id#1] Format: ParquetFormat, PushedFilters: [], ReadSchema: struct<value:bigint> while 1.6 still had the issue as reported. > Spark shouldn't scan all partitions when using Python UDF and filter over > partitioned column is given > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-11368 > URL: https://issues.apache.org/jira/browse/SPARK-11368 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Reporter: Maciej BryĆski > Priority: Critical > > Hi, > I think this is huge performance bug. > I created parquet file partitioned by column. > Then I make query with filter over partition column and filter with UDF. > Result is that all partition are scanned. > Sample data: > {code} > rdd = sc.parallelize(range(0,10000000)).map(lambda x: > Row(id=x%1000,value=x)).repartition(1) > df = sqlCtx.createDataFrame(rdd) > df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test') > df = sqlCtx.read.parquet('/mnt/mfs/udf_test') > df.registerTempTable('df') > {code} > Then queries: > Without udf - Spark reads only 10 partitions: > {code} > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and value > 100000').count() > print(time.time() - start) > 0.9993703365325928 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#22L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#25L]) > Project > Filter (value#5L > 100000) > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L] > {code} > With udf Spark reads all the partitions: > {code} > sqlCtx.registerFunction('multiply2', lambda x: x *2 ) > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > > 200000').count() > print(time.time() - start) > 13.0826096534729 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#34L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#37L]) > TungstenProject > Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0)) > !BatchPythonEvaluation PythonUDF#multiply2(value#5L), > [value#5L,id#6,pythonUDF#33] > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org