Bumping this thread.
Translating "where not(username is not null)" into a filter of
[IsNotNull(username),
Not(IsNotNull(username))] seems like a rather severe bug.
Spark 1.6.2:
explain select count(*) from parquet_table where not( username is not null)
== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
+- Project
+- Filter NOT isnotnull(username#1590)
+- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
PushedFilters: [Not(IsNotNull(username))]
Spark 2.0.2
explain select count(*) from parquet_table where not( username is not null)
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
+- *BatchedScan parquet default.<hive table name>[username#35] Format:
ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct<username:string>
Example to generate the above:
// Create some fake data
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
val rowsRDD = sc.parallelize(Seq(
Row(1, "fred"),
Row(2, "amy"),
Row(3, null)))
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("username", StringType, nullable = true)))
val data = sqlContext.createDataFrame(rowsRDD, schema)
val path = "SOME PATH HERE"
data.write.mode("overwrite").parquet(path)
val testData = sqlContext.read.parquet(path)
testData.registerTempTable("filter_test_table")
%sql
explain select count(*) from filter_test_table where not( username is not
null)
On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <[email protected]>
wrote:
> Hi,
>
> I have an application where I’m filtering data with SparkSQL with simple
> WHERE clauses. I also want the ability to show the unmatched rows for any
> filter, and so am wrapping the previous clause in `NOT()` to get the
> inverse. Example:
>
> Filter: username is not null
> Inverse filter: NOT(username is not null)
>
> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse
> filter always returns zero results. It looks like this is a problem with
> how the filter is getting pushed down to Parquet. Specifically, the
> pushdown includes both the “is not null” filter, AND “not(is not null)”,
> which would obviously result in zero matches. An example below:
>
> pyspark:
> > x = spark.sql('select my_id from my_table where *username is not null*')
> > y = spark.sql('select my_id from my_table where not(*username is not
> null*)')
>
> > x.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter isnotnull(username#91)
> +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
> PartitionFilters: [], PushedFilters: [IsNotNull(username)],
> ReadSchema: struct<my_id:bigint,username:string>
> [1159]> y.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
> +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
> PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> username
> ReadSchema: struct<my_id:bigint,username:string>
>
> Presently I’m working around this by using the new functionality of NOT
> EXISTS in Spark 2, but that seems like overkill.
>
> Any help appreciated.
>
>
> *Alexi Kostibas*Engineering
> *Nuna*
> 650 Townsend Street, Suite 425
> San Francisco, CA 94103
>
>