[ https://issues.apache.org/jira/browse/SPARK-32635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinod KC updated SPARK-32635: ----------------------------- Description: When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result eg:lit() function with cache() function. ----------------------------------- {code:java} from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show( finaldf.select('col2').show() #Wrong result {code} Output ----------- {code:java} >>> finaldf.show() +----+----+----+ |col2|col3|col1| +----+----+----+ | 2| 9| b| +----+----+----+ >>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1 +----+ |col2| +----+ | 1| +----+ +----+{code} lit() function without cache() function. {code:java} from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner") finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show() finaldf.select('col2').show() #Correct result {code} Output {code:java} ---------- >>> finaldf.show() +----+----+----+ |col2|col3|col1| +----+----+----+ | 2| 9| b| +----+----+----+ >>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached +----+ |col2| +----+ | 2| +----+ {code} was: When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result eg: ### lit() function with cache() function. ----------------------------------- {code:java} from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show( finaldf.select('col2').show() #Wrong result {code} Output ----------- {code:java} >>> finaldf.show() +----+----+----+ |col2|col3|col1| +----+----+----+ | 2| 9| b| +----+----+----+ >>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1 +----+ |col2| +----+ | 1| +----+ +----+{code} ### lit() function without cache() function. {code:java} from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner") finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show() finaldf.select('col2').show() #Correct result {code} Output {code:java} ---------- >>> finaldf.show() +----+----+----+ |col2|col3|col1| +----+----+----+ | 2| 9| b| +----+----+----+ >>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached +----+ |col2| +----+ | 2| +----+ {code} > When pyspark.sql.functions.lit() function is used with dataframe cache, it > returns wrong result > ----------------------------------------------------------------------------------------------- > > Key: SPARK-32635 > URL: https://issues.apache.org/jira/browse/SPARK-32635 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.0.0 > Reporter: Vinod KC > Priority: Major > > When pyspark.sql.functions.lit() function is used with dataframe cache, it > returns wrong result > eg:lit() function with cache() function. > ----------------------------------- > {code:java} > from pyspark.sql import Row > from pyspark.sql import functions as F > df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': > 'b'}]).withColumn("col2", F.lit(str(2))) > df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': > 8}]).withColumn("col2", F.lit(str(1))) > df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': > 9}]).withColumn("col2", F.lit(str(2))) > df_23 = df_2.union(df_3) > df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': > 9}]).withColumn("col2", F.lit(str(2))) > sel_col3 = df_23.select('col3', 'col2') > df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") > df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() > finaldf = df_23_a.join(df_4, on=['col2', 'col3'], > how='left').filter(F.col('col3') == 9) > finaldf.show( > finaldf.select('col2').show() #Wrong result > {code} > > Output > ----------- > {code:java} > >>> finaldf.show() > +----+----+----+ > |col2|col3|col1| > +----+----+----+ > | 2| 9| b| > +----+----+----+ > >>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1 > +----+ > |col2| > +----+ > | 1| > +----+ > +----+{code} > lit() function without cache() function. > {code:java} > from pyspark.sql import Row > from pyspark.sql import functions as F > df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': > 'b'}]).withColumn("col2", F.lit(str(2))) > df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': > 8}]).withColumn("col2", F.lit(str(1))) > df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': > 9}]).withColumn("col2", F.lit(str(2))) > df_23 = df_2.union(df_3) > df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': > 9}]).withColumn("col2", F.lit(str(2))) > sel_col3 = df_23.select('col3', 'col2') > df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") > df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner") > finaldf = df_23_a.join(df_4, on=['col2', 'col3'], > how='left').filter(F.col('col3') == 9) > finaldf.show() > finaldf.select('col2').show() #Correct result > {code} > > Output > {code:java} > ---------- > >>> finaldf.show() > +----+----+----+ > |col2|col3|col1| > +----+----+----+ > | 2| 9| b| > +----+----+----+ > >>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached > +----+ > |col2| > +----+ > | 2| > +----+ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org