[jira] [Commented] (SPARK-37954) old columns should not be available after select or drop

2022-02-20 Thread Varun Shah (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495177#comment-17495177
 ] 

Varun Shah commented on SPARK-37954:


Hi [~andrewfmurphy], there are 2 reasons you dont see any error in the drop 
function:
 # As you rightly pointed out, the function will not throw any error in case 
the column does not exists. I think this is debatable, considering the fact 
that functions like select/filter throws error if column is missing.
 # The other factor is spark catalyst which optimizes your query/DAG by 
performing optimizations like predicate push down, which in the example you 
have mentioned tries pushing the filter downward and during this, recognizes 
the column rename and uses oldCol from the original dataframe/rdd
 # Run the following 2 examples to see how the Plans get created for different 
scenarios:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as col
spark = SparkSession.builder.appName('available_columns').getOrCreate()
df = spark.createDataFrame([{"oldcol": 1}, {"oldcol": 2}])
df = df.withColumnRenamed('oldcol', 'newcol')
df = df.filter(col("oldcol")!=2)
df.count()
df.explain("extended") {code}
 

 
{noformat}
== Parsed Logical Plan ==
'Filter NOT ('oldcol = 2)
+- Project [oldcol#168L AS newcol#170L]
   +- LogicalRDD [oldcol#168L], false

== Analyzed Logical Plan ==
newcol: bigint
Project [newcol#170L]
+- Filter NOT (oldcol#168L = cast(2 as bigint))
   +- Project [oldcol#168L AS newcol#170L, oldcol#168L]
      +- LogicalRDD [oldcol#168L], false

== Optimized Logical Plan ==
Project [oldcol#168L AS newcol#170L]
+- Filter (isnotnull(oldcol#168L) AND NOT (oldcol#168L = 2))
   +- LogicalRDD [oldcol#168L], false

== Physical Plan ==
*(1) Project [oldcol#168L AS newcol#170L]
+- *(1) Filter (isnotnull(oldcol#168L) AND NOT (oldcol#168L = 2))
   +- *(1) Scan ExistingRDD[oldcol#168L]{noformat}
 



 
{code:java}
# action -2 
df2 = df.select(col("newcol"))
df2 = df2.filter(col("oldcol")!=2)
df2.count()
df2.explain("extended"){code}
 
{code:java}
== Parsed Logical Plan ==
'Filter NOT ('oldcol = 2)
+- Project [newcol#170L]
   +- Project [newcol#170L]
      +- Filter NOT (oldcol#168L = cast(2 as bigint))
         +- Project [oldcol#168L AS newcol#170L, oldcol#168L]
            +- LogicalRDD [oldcol#168L], false

== Analyzed Logical Plan ==
newcol: bigint
Project [newcol#170L]
+- Filter NOT (oldcol#168L = cast(2 as bigint))
   +- Project [newcol#170L, oldcol#168L]
      +- Project [newcol#170L, oldcol#168L]
         +- Filter NOT (oldcol#168L = cast(2 as bigint))
            +- Project [oldcol#168L AS newcol#170L, oldcol#168L]
               +- LogicalRDD [oldcol#168L], false

== Optimized Logical Plan ==
Project [oldcol#168L AS newcol#170L]
+- Filter (isnotnull(oldcol#168L) AND NOT (oldcol#168L = 2))
   +- LogicalRDD [oldcol#168L], false

== Physical Plan ==
*(1) Project [oldcol#168L AS newcol#170L]
+- *(1) Filter (isnotnull(oldcol#168L) AND NOT (oldcol#168L = 2))
   +- *(1) Scan ExistingRDD[oldcol#168L]


{code}
 

 

> old columns should not be available after select or drop
> 
>
> Key: SPARK-37954
> URL: https://issues.apache.org/jira/browse/SPARK-37954
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.1
>Reporter: Jean Bon
>Priority: Major
>
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col as col
> spark = SparkSession.builder.appName('available_columns').getOrCreate()
> df = spark.range(5).select((col("id")+10).alias("id2"))
> assert df.columns==["id2"] #OK
> try:
>     df.select("id")
>     error_raise = False
> except:
>     error_raise = True
> assert error_raise #OK
> df = df.drop("id") #should raise an error
> df.filter(col("id")!=2).count() #returns 4, should raise an error
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37954) old columns should not be available after select or drop

2022-02-10 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490639#comment-17490639
 ] 

L. C. Hsieh commented on SPARK-37954:
-

I quickly scanned what we did and it looks like intentional to resolve the 
missing references for Filter. To separately consider the two cases

df.select(df("name")).filter(df("id") === 0).show()  

and

df = df.select(df("name"))
df = df.filter(df("id") === 0).show()  

seems a bit weird idea. I don't think/remember we expect they are two different 
things.



> old columns should not be available after select or drop
> 
>
> Key: SPARK-37954
> URL: https://issues.apache.org/jira/browse/SPARK-37954
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.1
>Reporter: Jean Bon
>Priority: Major
>
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col as col
> spark = SparkSession.builder.appName('available_columns').getOrCreate()
> df = spark.range(5).select((col("id")+10).alias("id2"))
> assert df.columns==["id2"] #OK
> try:
>     df.select("id")
>     error_raise = False
> except:
>     error_raise = True
> assert error_raise #OK
> df = df.drop("id") #should raise an error
> df.filter(col("id")!=2).count() #returns 4, should raise an error
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37954) old columns should not be available after select or drop

2022-02-10 Thread Andrew Murphy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490630#comment-17490630
 ] 

Andrew Murphy commented on SPARK-37954:
---

 Hi all,

Super excited to report on this one, this is my first time attempting to 
contribute to Spark so bear with me!

So it looks like df.drop("id") should not raise an error per the 
[documentation|https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.drop.html]
 "This is a no-op if schema doesn’t contain the given column name(s)." So this 
is intended behavior.

I was sure Filter was a bug though. Normally, filtering on a nonexistent column 
_does_ throw an AnalysisException, but in a very specific case, Catalyst 
actually propagates the reference to be removed.

*Code*

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as col
spark = SparkSession.builder.appName('available_columns').getOrCreate()
df = spark.createDataFrame([{"oldcol": 1}, {"oldcol": 2}])
df = df.withColumnRenamed('oldcol', 'newcol')
df = df.filter(col("oldcol")!=2)
df.explain("extended") {code}
*Output*

 

 
{code:java}
+--+
|newcol|
+--+
| 1|
+--+

== Parsed Logical Plan ==
'Filter NOT ('oldcol = 2)
+- Project [oldcol#6L AS newcol#8L]
   +- LogicalRDD [oldcol#6L], false

== Analyzed Logical Plan ==
newcol: bigint
Project [newcol#8L]
+- Filter NOT (oldcol#6L = cast(2 as bigint))
   +- Project [oldcol#6L AS newcol#8L, oldcol#6L]
  +- LogicalRDD [oldcol#6L], false

== Optimized Logical Plan ==
Project [oldcol#6L AS newcol#8L]
+- Filter (isnotnull(oldcol#6L) AND NOT (oldcol#6L = 2))
   +- LogicalRDD [oldcol#6L], false

== Physical Plan ==
*(1) Project [oldcol#6L AS newcol#8L]
+- *(1) Filter (isnotnull(oldcol#6L) AND NOT (oldcol#6L = 2))
   +- *(1) Scan ExistingRDD[oldcol#6L] {code}
As you can see, in the Analysis step, Catalyst propagates oldcol#6L to the 
Project operator for seemingly no reason. Well, it turns out the reason is 
actually from SPARK-24781 ([PR)|https://github.com/apache/spark/pull/21745], 
where Analysis was failing on this construction.
{code:scala}
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
{code}
{noformat}
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from 
name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
   +- AnalysisBarrier
  +- Project [name#5]
 +- Project [_1#2 AS name#5, _2#3 AS id#6]
+- LocalRelation [_1#2, _2#3]{noformat}
So it looks like this PR was created to propagate references in the very 
specific case of a Filter after Project when we filter and select in the same 
step, but unfortunately we don't differentiate between
{code:java}
df.select(df("name")).filter(df("id") === 0).show()  {code}
and 
{code:java}
df = df.select(df("name"))
df = df.filter(df("id") === 0).show()  {code}
[~viirya] I see you were the one who fixed this problem in the first place. Any 
thoughts about how this could be solved? Unfortunately, creating a fix for this 
one is beyond my capabilities right now but I'm trying to learn!

 

> old columns should not be available after select or drop
> 
>
> Key: SPARK-37954
> URL: https://issues.apache.org/jira/browse/SPARK-37954
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.1
>Reporter: Jean Bon
>Priority: Major
>
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col as col
> spark = SparkSession.builder.appName('available_columns').getOrCreate()
> df = spark.range(5).select((col("id")+10).alias("id2"))
> assert df.columns==["id2"] #OK
> try:
>     df.select("id")
>     error_raise = False
> except:
>     error_raise = True
> assert error_raise #OK
> df = df.drop("id") #should raise an error
> df.filter(col("id")!=2).count() #returns 4, should raise an error
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org