[ 
https://issues.apache.org/jira/browse/SPARK-40200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Pietersen updated SPARK-40200:
-------------------------------------
    Description: 
Unpersist of a parent dataset which has a column from 
_*monotonically_increasing_id*_ cascades to a child dataset when
 * joined on another dataset
 * kryo serialization is enabled
 * storage level is MEMORY_AND_DISK_SER
 * not all rows join????

 
{code:java}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.storage.StorageLevel

case class a(value: String, id: Long)

val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
//val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade

val acc = sc.longAccumulator("acc")
val parent1DS = spark.createDataset(Seq("a", "b", "c"))
                     .withColumn("id", monotonically_increasing_id)
                     .as[a]
                     .persist(storageLevel)

val parent2DS = spark.createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
                     .persist(storageLevel)

val childDS = parent1DS.joinWith(parent2DS, parent1DS("id") === 
parent2DS("value"))
                       .map(i =>{ 
                                  acc.add(1) 
                                  i
                                }).persist(storageLevel)

childDS.count
parent1DS.unpersist
childDS.count

acc.value should be(2) {code}

  was:
Unpersist of a parent dataset which has a column from 
`monotonically_increasing_id` cascades to a child dataset when
 * joined on another dataset
 * kryo serialization is enabled
 * storage level is MEMORY_AND_DISK_SER
 * not all rows join????

 
{code:java}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.storage.StorageLevel

case class a(value: String, id: Long)

val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
//val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade

val acc = sc.longAccumulator("acc")
val parent1DS = spark.createDataset(Seq("a", "b", "c"))
                     .withColumn("id", monotonically_increasing_id)
                     .as[a]
                     .persist(storageLevel)

val parent2DS = spark.createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
                     .persist(storageLevel)

val childDS = parent1DS.joinWith(parent2DS, parent1DS("id") === 
parent2DS("value"))
                       .map(i =>{ 
                                  acc.add(1) 
                                  i
                                }).persist(storageLevel)

childDS.count
parent1DS.unpersist
childDS.count

acc.value should be(2) {code}


> unpersist cascades with Kryo, MEMORY_AND_DISK_SER and 
> monotonically_increasing_id
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-40200
>                 URL: https://issues.apache.org/jira/browse/SPARK-40200
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.0
>         Environment: spark-3.3.0
>            Reporter: Calvin Pietersen
>            Priority: Major
>
> Unpersist of a parent dataset which has a column from 
> _*monotonically_increasing_id*_ cascades to a child dataset when
>  * joined on another dataset
>  * kryo serialization is enabled
>  * storage level is MEMORY_AND_DISK_SER
>  * not all rows join????
>  
> {code:java}
> import org.apache.spark.sql.functions.monotonically_increasing_id
> import org.apache.spark.storage.StorageLevel
> case class a(value: String, id: Long)
> val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
> //val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade
> val acc = sc.longAccumulator("acc")
> val parent1DS = spark.createDataset(Seq("a", "b", "c"))
>                      .withColumn("id", monotonically_increasing_id)
>                      .as[a]
>                      .persist(storageLevel)
> val parent2DS = spark.createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
>                      .persist(storageLevel)
> val childDS = parent1DS.joinWith(parent2DS, parent1DS("id") === 
> parent2DS("value"))
>                        .map(i =>{ 
>                                   acc.add(1) 
>                                   i
>                                 }).persist(storageLevel)
> childDS.count
> parent1DS.unpersist
> childDS.count
> acc.value should be(2) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to