[ https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maciej Bryński updated SPARK-11368: ----------------------------------- Description: Hi, I think this is huge performance bug. I created parquet file partitioned by column. Then I make query with filter over partition column and filter with UDF. Result is that all partition is scanned. Sample data: {code} rdd = sc.parallelize(range(0,10000000)).map(lambda x: Row(id=x%1000,value=x)).repartition(1) df = sqlCtx.createDataFrame(rdd) df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test') df = sqlCtx.read.parquet('/mnt/mfs/udf_test') df.registerTempTable('df') {code} Then queries: Without udf - Spark reads only 10 partitions: {code} start = time.time() sqlCtx.sql('select * from df where id >= 990 and value > 100000').count() print(time.time() - start) 0.9993703365325928 == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#22L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#25L]) Project Filter (value#5L > 100000) Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L] {code} With udf Spark reads all the partitions: {code} sqlCtx.registerFunction('multiply2', lambda x: x *2 ) start = time.time() sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 200000').count() print(time.time() - start) 13.0826096534729 == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#34L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#37L]) TungstenProject Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0)) !BatchPythonEvaluation PythonUDF#multiply2(value#5L), [value#5L,id#6,pythonUDF#33] Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6] {code} was: Hi, I think this is huge performance bug. I created parquet file partitioned by column. Then I make query with filter over partition column and filter with UDF. Result is that all partition is scanned. Sample data: {code} rdd = sc.parallelize(range(0,10000000)).map(lambda x: Row(id=x%1000,value=x)).repartition(1) df = sqlCtx.createDataFrame(rdd) df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test') df = sqlCtx.read.parquet('/mnt/mfs/udf_test') df.registerTempTable('df') {code} Then queries: Without udf - Spark reads only 10 partitions: {code} start = time.time() sqlCtx.sql('select * from df where id >= 990 and value > 100000').count() print(time.time() - start) 0.9993703365325928 {code} With udf Spark reads all the partitions: {code} sqlCtx.registerFunction('multiply2', lambda x: x *2 ) start = time.time() sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 200000').count() print(time.time() - start) 13.0826096534729 {code} > Spark scan all partitions when using Python UDF and filter over partitioned > column is given > ------------------------------------------------------------------------------------------- > > Key: SPARK-11368 > URL: https://issues.apache.org/jira/browse/SPARK-11368 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Reporter: Maciej Bryński > Priority: Critical > > Hi, > I think this is huge performance bug. > I created parquet file partitioned by column. > Then I make query with filter over partition column and filter with UDF. > Result is that all partition is scanned. > Sample data: > {code} > rdd = sc.parallelize(range(0,10000000)).map(lambda x: > Row(id=x%1000,value=x)).repartition(1) > df = sqlCtx.createDataFrame(rdd) > df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test') > df = sqlCtx.read.parquet('/mnt/mfs/udf_test') > df.registerTempTable('df') > {code} > Then queries: > Without udf - Spark reads only 10 partitions: > {code} > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and value > 100000').count() > print(time.time() - start) > 0.9993703365325928 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#22L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#25L]) > Project > Filter (value#5L > 100000) > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L] > {code} > With udf Spark reads all the partitions: > {code} > sqlCtx.registerFunction('multiply2', lambda x: x *2 ) > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > > 200000').count() > print(time.time() - start) > 13.0826096534729 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#34L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#37L]) > TungstenProject > Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0)) > !BatchPythonEvaluation PythonUDF#multiply2(value#5L), > [value#5L,id#6,pythonUDF#33] > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org