[ https://issues.apache.org/jira/browse/SPARK-40200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Calvin Pietersen updated SPARK-40200: ------------------------------------- Description: Unpersist of a parent dataset which has a column from `monotonically_increasing_id` cascades to a child dataset when * joined on another dataset * kryo serialization is enabled * storage level is MEMORY_AND_DISK_SER * not all rows join???? ``` import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.storage.StorageLevel case class a(value: String, id: Long) val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades //val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade val acc = sc.longAccumulator("acc") val parent1DS = spark .createDataset(Seq("a", "b", "c")) .withColumn("id", monotonically_increasing_id) .as[a] .persist(storageLevel) val parent2DS = spark .createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade .persist(storageLevel) val childDS = parent1DS .joinWith(parent2DS, parent1DS("id") === parent2DS("value")) .map(i => { acc.add(1) i } ).persist(storageLevel) childDS.count parent1DS.unpersist childDS.count acc.value should be(2) ``` was: Unpersist of a parent dataset which has a column from `monotonically_increasing_id` cascades to a child dataset when * joined on another dataset * kryo serialization is enabled * storage level is MEMORY_AND_DISK_SER * not all rows join???? ``` import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.storage.StorageLevel case class a(value: String, id: Long) val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades //val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade val acc = sc.longAccumulator("acc") val parent1DS = spark .createDataset(Seq("a", "b", "c")) .withColumn("id", monotonically_increasing_id) .as[a] .persist(storageLevel) val parent2DS = spark .createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade .persist(storageLevel) val childDS = parent1DS .joinWith(parent2DS, parent1DS("id") === parent2DS("value")) .map(i => { acc.add(1) i }).persist(storageLevel) childDS.count parent1DS.unpersist childDS.count acc.value should be(2) ``` > unpersist cascades with Kryo, MEMORY_AND_DISK_SER and > monotonically_increasing_id > --------------------------------------------------------------------------------- > > Key: SPARK-40200 > URL: https://issues.apache.org/jira/browse/SPARK-40200 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.0 > Environment: spark-3.3.0 > Reporter: Calvin Pietersen > Priority: Major > > Unpersist of a parent dataset which has a column from > `monotonically_increasing_id` cascades to a child dataset when > * joined on another dataset > * kryo serialization is enabled > * storage level is MEMORY_AND_DISK_SER > * not all rows join???? > > > > > ``` > import org.apache.spark.sql.functions.monotonically_increasing_id > import org.apache.spark.storage.StorageLevel > case class a(value: String, id: Long) > val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades > //val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade > val acc = sc.longAccumulator("acc") > val parent1DS = spark > .createDataset(Seq("a", "b", "c")) > .withColumn("id", monotonically_increasing_id) > .as[a] > .persist(storageLevel) > val parent2DS = spark > .createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade > .persist(storageLevel) > val childDS = parent1DS > .joinWith(parent2DS, parent1DS("id") === parent2DS("value")) > .map(i => > { acc.add(1) i } > ).persist(storageLevel) > childDS.count > parent1DS.unpersist > childDS.count > acc.value should be(2) > ``` > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org