On the plus side, looks like this may be fixed in 2.1.0:
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct<username:string>
On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <[email protected]> wrote:
> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of
> [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
> +- Project
> +- Filter NOT isnotnull(username#1590)
> +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
> +- *HashAggregate(keys=[], functions=[partial_count(1)])
> +- *Project
> +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
> +- *BatchedScan parquet default.<hive table name>[username#35] Format:
> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct<username:string>
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "fred"),
> Row(2, "amy"),
> Row(3, null)))
>
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <[email protected]
> > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter: username is not null
>> Inverse filter: NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>> PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>> ReadSchema: struct<my_id:bigint,username:string>
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>> PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>> ReadSchema: struct<my_id:bigint,username:string>
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>