[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-30162:
----------------------------------
    Attachment: partition_pruning.png

> Add PushedFilters to metadata in Parquet DSv2 implementation
> ------------------------------------------------------------
>
>                 Key: SPARK-30162
>                 URL: https://issues.apache.org/jira/browse/SPARK-30162
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>            Reporter: Nasir Ali
>            Assignee: Hyukjin Kwon
>            Priority: Minor
>         Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from 
> 2020-01-01 21-01-32.png, partition_pruning.png
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
>                             ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
>                             ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
>                             ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
>                             ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
>                             ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
>                             ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-16T11:27:18+00:00")
>                             ],
>                            ["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct<id:double,ts:timestamp>{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>    +- *(1) ColumnarToRow
>       +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct<id:double,ts:timestamp>
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be 
> overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
> mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview
>       /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
> SparkSession available as 'spark'.
> {code}
> {code:java}
> // pyspark 2.4.4 shell output
> pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> 2019-12-09 07:09:07 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
>       /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
> SparkSession available as 'spark'.
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to