On the plus side, looks like this may be fixed in 2.1.0: == Physical Plan == *HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)]) +- *Project +- *Filter NOT isnotnull(username#14) +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>
On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> wrote: > Bumping this thread. > > Translating "where not(username is not null)" into a filter of > [IsNotNull(username), > Not(IsNotNull(username))] seems like a rather severe bug. > > Spark 1.6.2: > > explain select count(*) from parquet_table where not( username is not null) > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[_c0#1822L]) > +- TungstenExchange SinglePartition, None > +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[count#1825L]) > +- Project > +- Filter NOT isnotnull(username#1590) > +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, > PushedFilters: [Not(IsNotNull(username))] > > Spark 2.0.2 > > explain select count(*) from parquet_table where not( username is not null) > > == Physical Plan == > *HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)]) > +- *Project > +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35)) > +- *BatchedScan parquet default.<hive table name>[username#35] Format: > ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], > PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], > ReadSchema: struct<username:string> > > Example to generate the above: > > // Create some fake data > > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > > val rowsRDD = sc.parallelize(Seq( > Row(1, "fred"), > Row(2, "amy"), > Row(3, null))) > > val schema = StructType(Seq( > StructField("id", IntegerType, nullable = true), > StructField("username", StringType, nullable = true))) > > val data = sqlContext.createDataFrame(rowsRDD, schema) > > val path = "SOME PATH HERE" > > data.write.mode("overwrite").parquet(path) > > val testData = sqlContext.read.parquet(path) > > testData.registerTempTable("filter_test_table") > > > %sql > explain select count(*) from filter_test_table where not( username is not > null) > > > On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <akosti...@nuna.com.invalid > > wrote: > >> Hi, >> >> I have an application where I’m filtering data with SparkSQL with simple >> WHERE clauses. I also want the ability to show the unmatched rows for any >> filter, and so am wrapping the previous clause in `NOT()` to get the >> inverse. Example: >> >> Filter: username is not null >> Inverse filter: NOT(username is not null) >> >> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the >> inverse filter always returns zero results. It looks like this is a problem >> with how the filter is getting pushed down to Parquet. Specifically, the >> pushdown includes both the “is not null” filter, AND “not(is not null)”, >> which would obviously result in zero matches. An example below: >> >> pyspark: >> > x = spark.sql('select my_id from my_table where *username is not null* >> ') >> > y = spark.sql('select my_id from my_table where not(*username is not >> null*)') >> >> > x.explain() >> == Physical Plan == >> *Project [my_id#6L] >> +- *Filter isnotnull(username#91) >> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >> PartitionFilters: [], PushedFilters: [IsNotNull(username)], >> ReadSchema: struct<my_id:bigint,username:string> >> [1159]> y.explain() >> == Physical Plan == >> *Project [my_id#6L] >> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username >> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >> PartitionFilters: [], >> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user >> name >> ReadSchema: struct<my_id:bigint,username:string> >> >> Presently I’m working around this by using the new functionality of NOT >> EXISTS in Spark 2, but that seems like overkill. >> >> Any help appreciated. >> >> >> *Alexi Kostibas*Engineering >> *Nuna* >> 650 Townsend Street, Suite 425 >> San Francisco, CA 94103 >> >> >