[ https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhenyi Lin updated SPARK-27375: ------------------------------- Description: Below gives an example. If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). However, after using discretizer fit and transform DF, col(r1) and col(r2) are different. {noformat} spark.catalog.clearCache() import random random.seed(123) @udf(IntegerType()) def ri(): return random.choice([1,2,3,4,5,6,7,8,9]) df = spark.range(100).repartition("id") #remove discretizer part, col(r1) will be equal to col(r2) discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", outputCol="quantileNo") df = discretizer.fit(df).transform(df) # if we add following 1 line copy df, col(r1) will also become equal to col(r2) # df = df.rdd.toDF() df = df.withColumn("r", ri()).cache() df1 = df.withColumnRenamed("r", "r1") df2 = df.withColumnRenamed("r", "r2") df1.join(df2, "id").explain() dfj = df1.join(df2, "id") dfj.select("id", "r1", "r2").show(5) The result is shown as below, we see that col(r1) and col(r2) are different. The physical plan shows that the cache() is missed in join operation. To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if we remove discretizer fit and transformation, col(r1) and col(r2) become identical. == Physical Plan == *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649] +- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS quantileNo#15622, pythonUDF0#15661 AS r1#15645] : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661] : +- Exchange hashpartitioning(id#15612L, 24) : +- *(1) Range (0, 100, step=1, splits=6) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS quantileNo#15653, pythonUDF0#15662 AS r2#15649] +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662] +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24) +---+---+---+ | id| r1| r2| +---+---+---+ | 28| 9| 3| | 30| 3| 6| | 88| 1| 9| | 67| 3| 3| | 66| 1| 5| +---+---+---+ only showing top 5 rows {noformat} was: Below gives an example. If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). However, after using discretizer fit and transform DF, col(r1) and col(r2) are different. {noformat} spark.catalog.clearCache() import random random.seed(123) @udf(IntegerType()) def ri(): return random.choice([1,2,3,4,5,6,7,8,9]) df = spark.range(100).repartition("id") #remove discretizer part, col(r1) will be equal to col(r2) discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", outputCol="quantileNo") df = discretizer.fit(df).transform(df) # if we add following 1 line copy df, col(r1) will also become equal to col(r2) # df = df.rdd.toDF() df = df.withColumn("r", ri()).cache() df1 = df.withColumnRenamed("r", "r1") df2 = df.withColumnRenamed("r", "r2") df1.join(df2, "id").explain() dfj = df1.join(df2, "id") dfj.select("id", "r1", "r2").show(5) The result is shown as below, we see that col(r1) and col(r2) are different. The physical plan shows that the cache() is missed in join operation. To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if we remove discretizer fit and transformation, col(r1) and col(r2) become identical. == Physical Plan == *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649] +- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS quantileNo#15622, pythonUDF0#15661 AS r1#15645] : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661] : +- Exchange hashpartitioning(id#15612L, 24) : +- *(1) Range (0, 100, step=1, splits=6) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS quantileNo#15653, pythonUDF0#15662 AS r2#15649] +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662] +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24) +---+---+---+ | id| r1| r2| +---+---+---+ | 28| 9| 3| | 30| 3| 6| | 88| 1| 9| | 67| 3| 3| | 66| 1| 5| +---+---+---+ only showing top 5 rows {noformat} > cache not working after discretizer.fit(df).transform operation > --------------------------------------------------------------- > > Key: SPARK-27375 > URL: https://issues.apache.org/jira/browse/SPARK-27375 > Project: Spark > Issue Type: Bug > Components: Examples > Affects Versions: 2.3.0 > Reporter: Zhenyi Lin > Priority: Major > > Below gives an example. > If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). > However, after using discretizer fit and transform DF, col(r1) and col(r2) > are different. > > {noformat} > spark.catalog.clearCache() > import random > random.seed(123) > @udf(IntegerType()) > def ri(): > return random.choice([1,2,3,4,5,6,7,8,9]) > df = spark.range(100).repartition("id") > #remove discretizer part, col(r1) will be equal to col(r2) > discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", > outputCol="quantileNo") > df = discretizer.fit(df).transform(df) > # if we add following 1 line copy df, col(r1) will also become equal to > col(r2) > # df = df.rdd.toDF() > df = df.withColumn("r", ri()).cache() > df1 = df.withColumnRenamed("r", "r1") > df2 = df.withColumnRenamed("r", "r2") > df1.join(df2, "id").explain() > dfj = df1.join(df2, "id") > dfj.select("id", "r1", "r2").show(5) > > The result is shown as below, we see that col(r1) and col(r2) are different. > The physical plan shows that the cache() is missed in join operation. > To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or > if we remove discretizer fit and transformation, col(r1) and col(r2) become > identical. > > == Physical Plan == > *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, > r2#15649] > +- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight > :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS > quantileNo#15622, pythonUDF0#15661 AS r1#15645] > : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661] > : +- Exchange hashpartitioning(id#15612L, 24) > : +- *(1) Range (0, 100, step=1, splits=6) > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS > quantileNo#15653, pythonUDF0#15662 AS r2#15649] > +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662] > +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24) > +---+---+---+ > | id| r1| r2| > +---+---+---+ > | 28| 9| 3| > | 30| 3| 6| > | 88| 1| 9| > | 67| 3| 3| > | 66| 1| 5| > +---+---+---+ > only showing top 5 rows > > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org