[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Description: 
Below gives an example.

If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). 
However, after using discretizer fit and transform DF, col(r1) and col(r2) are 
different.

 
{noformat}
spark.catalog.clearCache()
import random
random.seed(123)

@udf(IntegerType())
def ri():
return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo") 
df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()


df = df.withColumn("r", ri()).cache()
df1 = df.withColumnRenamed("r", "r1")
df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
dfj = df1.join(df2, "id")
dfj.select("id", "r1", "r2").show(5)


 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan shows that the cache() is missed in join operation. 

To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if 
we remove discretizer fit and transformation, col(r1) and col(r2) become 
identical. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}
 

  was:
Below gives an example.

If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). 
However, after using discretizer fit and transform DF, col(r1) and col(r2) are 
different.

 
{noformat}
spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan shows that the cache() is missed in join operation. 

To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if 
we remove discretizer fit and transformation, col(r1) and col(r2) become 
identical. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}
 


> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27375
> URL: https://issues.apache.org/jira/browse/SPARK-27375
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.3.0
>Reporter: Zhenyi Lin
>Priority: Major
>
> Below gives an example.
> If cache works, 

[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Description: 
Below gives an example.

If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). 
However, after using discretizer fit and transform DF, col(r1) and col(r2) are 
different.

 
{noformat}
spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan shows that the cache() is missed in join operation. 

To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if 
we remove discretizer fit and transformation, col(r1) and col(r2) become 
identical. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}
 

  was:
Below gives an example.

If cache works, col(r1) in the output should be equal to col(r2). However, 
after using discretizer fit and transform DF, col(r1) and col(r2) becomes 
different.

 
{noformat}
spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan shows that the cache() is missed in join operation. 

To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if 
we remove discretizer fit and transformation, col(r1) and col(r2) become 
identical. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}
 


> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27375
> URL: https://issues.apache.org/jira/browse/SPARK-27375
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.3.0
>Reporter: Zhenyi Lin
>Priority: Major
>
> Below gives an example.
> If cache works, 

[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Description: 
Below gives an example.

If cache works, col(r1) in the output should be equal to col(r2). However, 
after using discretizer fit and transform DF, col(r1) and col(r2) becomes 
different.

 
{noformat}
spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan shows that the cache() is missed in join operation. 

To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if 
we remove discretizer fit and transformation, col(r1) and col(r2) become 
identical. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}
 

  was:
Below gives an example. col(r1) should be equal to col(r2) if cache operation 
works. However, after using discretizer fit and transformation DF, col(r1) and 
col(r2) becomes different

 

{noformat}

spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan of join operation shows that the cache() is missed. On the 
other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
or if we remove discretizer fit and transformation, col(r1) and col(r2) become 
the same. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}

 


> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27375
> URL: https://issues.apache.org/jira/browse/SPARK-27375
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.3.0
>Reporter: Zhenyi Lin
>Priority: Major
>
> Below gives an example.
> If cache work

[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Description: 
Below gives an example. col(r1) should be equal to col(r2) if cache operation 
works. However, after using discretizer fit and transformation DF, col(r1) and 
col(r2) becomes different

 

{noformat}

spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan of join operation shows that the cache() is missed. On the 
other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
or if we remove discretizer fit and transformation, col(r1) and col(r2) become 
the same. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

 

{noformat}

 

  was:
Below gives an example. col(r1) should be equal to col(r2) if cache operation 
works. However, after using discretizer fit and transformation DF, col(r1) and 
col(r2) becomes different

 

 

spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

 

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan of join operation shows that the cache() is missed. On the 
other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
or if we remove discretizer fit and transformation, col(r1) and col(r2) become 
the same. 

 

== Physical Plan ==
 *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, 
r2#15649|#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
 +- *(4) BroadcastHashJoin [id#15612L|#15612L], [id#15655L|#15655L], Inner, 
BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645|#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661|#15612L, 
pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649|#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662|#15655L, 
pythonUDF0#15662]
 +- ReusedExchange [id#15655L|#15655L], Exchange hashpartitioning(id#15612L, 24)
 ++--++---
|id|r1|r2|

++--++---
|28|9|3|
|30|3|6|
|88|1|9|
|67|3|3|
|66|1|5|

++--++---
 only showing top 5 rows

+-+-++---
 only showing top 5 rows


> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27

[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Description: 
Below gives an example. col(r1) should be equal to col(r2) if cache operation 
works. However, after using discretizer fit and transformation DF, col(r1) and 
col(r2) becomes different

 

 

spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
     return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

 

# if we add following 1 line copy df, col(r1) will also become equal to col(r2)
# df = df.rdd.toDF()

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan of join operation shows that the cache() is missed. On the 
other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
or if we remove discretizer fit and transformation, col(r1) and col(r2) become 
the same. 

 

== Physical Plan ==
 *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, 
r2#15649|#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
 +- *(4) BroadcastHashJoin [id#15612L|#15612L], [id#15655L|#15655L], Inner, 
BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645|#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661|#15612L, 
pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649|#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662|#15655L, 
pythonUDF0#15662]
 +- ReusedExchange [id#15655L|#15655L], Exchange hashpartitioning(id#15612L, 24)
 ++--++---
|id|r1|r2|

++--++---
|28|9|3|
|30|3|6|
|88|1|9|
|67|3|3|
|66|1|5|

++--++---
 only showing top 5 rows

+-+-++---
 only showing top 5 rows

  was:
Below gives an example. col(r1) should be equal to col(r2) if cache operation 
works. However, after using discretizer fit and transformation DF, col(r1) and 
col(r2) becomes different

 

 

spark.catalog.clearCache()
 import random
 random.seed(123)

@udf(IntegerType())
 def ri():
    return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
outputCol="quantileNo")
 df = discretizer.fit(df).transform(df)

df = df.withColumn("r", ri()).cache()
 df1 = df.withColumnRenamed("r", "r1")
 df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
 dfj = df1.join(df2, "id")
 dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. 
The physical plan of join operation shows that the cache() is missed. On the 
other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
or if we remove discretizer fit and transformation, col(r1) and col(r2) become 
the same. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows

++--++---
 only showing top 5 rows


> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27375
> URL: https://issues.apache.org/jira/browse/SPARK-27375
>

[jira] [Updated] (SPARK-27375) cache not working after discretizer.fit(df).transform operation

2019-04-03 Thread Zhenyi Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenyi Lin updated SPARK-27375:
---
Summary: cache not working after discretizer.fit(df).transform operation  
(was: cache not working after discretizer.fit(df).transform)

> cache not working after discretizer.fit(df).transform operation
> ---
>
> Key: SPARK-27375
> URL: https://issues.apache.org/jira/browse/SPARK-27375
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.3.0
>Reporter: Zhenyi Lin
>Priority: Major
>
> Below gives an example. col(r1) should be equal to col(r2) if cache operation 
> works. However, after using discretizer fit and transformation DF, col(r1) 
> and col(r2) becomes different
>  
>  
> spark.catalog.clearCache()
>  import random
>  random.seed(123)
> @udf(IntegerType())
>  def ri():
>     return random.choice([1,2,3,4,5,6,7,8,9])
> df = spark.range(100).repartition("id")
> #remove discretizer part, col(r1) will be equal to col(r2)
>  discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", 
> outputCol="quantileNo")
>  df = discretizer.fit(df).transform(df)
> df = df.withColumn("r", ri()).cache()
>  df1 = df.withColumnRenamed("r", "r1")
>  df2 = df.withColumnRenamed("r", "r2")
> df1.join(df2, "id").explain()
>  dfj = df1.join(df2, "id")
>  dfj.select("id", "r1", "r2").show(5)
>  
> The result is shown as below, we see that col(r1) and col(r2) are different. 
> The physical plan of join operation shows that the cache() is missed. On the 
> other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, 
> or if we remove discretizer fit and transformation, col(r1) and col(r2) 
> become the same. 
>  
> == Physical Plan ==
> *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, 
> r2#15649]
> +- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
>  :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS 
> quantileNo#15622, pythonUDF0#15661 AS r1#15645]
>  : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
>  : +- Exchange hashpartitioning(id#15612L, 24)
>  : +- *(1) Range (0, 100, step=1, splits=6)
>  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>  +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS 
> quantileNo#15653, pythonUDF0#15662 AS r2#15649]
>  +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
>  +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
> +---+---+---+
> | id| r1| r2|
> +---+---+---+
> | 28| 9| 3|
> | 30| 3| 6|
> | 88| 1| 9|
> | 67| 3| 3|
> | 66| 1| 5|
> +---+---+---+
> only showing top 5 rows
> ++--++---
>  only showing top 5 rows



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org