On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct<username:string>



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> wrote:

> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of  
> [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct<username:string>
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
>     Row(1, "fred"),
>     Row(2, "amy"),
>     Row(3, null)))
>
> val schema = StructType(Seq(
>     StructField("id", IntegerType, nullable = true),
>     StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <akosti...@nuna.com.invalid
> > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter:  username is not null
>> Inverse filter:  NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>        ReadSchema: struct<my_id:bigint,username:string>
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>        PartitionFilters: [],
>>        PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>>        ReadSchema: struct<my_id:bigint,username:string>
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>

Reply via email to