[ https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yzheng616 updated SPARK-21721: ------------------------------ Description: The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated. Below is a simple code to reproduce it. package test import org.apache.spark.sql.SparkSession import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.SaveMode import java.lang.reflect.Field case class PathLeakTest(id: Int, gp: String) object StagePathLeak { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") //create a partitioned table spark.sql("drop table if exists path_leak"); spark.sql("create table if not exists path_leak(id int)" + " partitioned by (gp String)"+ " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ " stored as"+ " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() for (x <- 1 to 2) { seq += (new PathLeakTest(x, "g" + x)) } val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) //insert 50 records to Hive table for (j <- 1 to 50) { val df = spark.createDataFrame(rdd) //#1 InsertIntoHiveTable line 118: add stage path to FileSystem deleteOnExit cache //#2 InsertIntoHiveTable line 385: delete the path from disk but not from the FileSystem cache, and it caused the leak df.write.mode(SaveMode.Overwrite).insertInto("path_leak") } val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) val deleteOnExit = getDeleteOnExit(fs.getClass) deleteOnExit.setAccessible(true) val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] //check FileSystem deleteOnExit cache size println(caches.size()) val it = caches.iterator() //all starge pathes were still cached even they have already been deleted from the disk while(it.hasNext()){ println(it.next()); } } def getDeleteOnExit(cls: Class[_]) : Field = { try{ return cls.getDeclaredField("deleteOnExit") }catch{ case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass) } return null } } was: The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated. Below is a simple code to reproduce it. bq. package test bq. bq. import org.apache.spark.sql.SparkSession bq. import org.apache.hadoop.fs.Path bq. import org.apache.hadoop.fs.FileSystem bq. import org.apache.spark.sql.SaveMode bq. import java.lang.reflect.Field bq. bq. bq. bq. case class PathLeakTest(id: Int, gp: String) bq. bq. object StagePathLeak { bq. bq. def main(args: Array[String]): Unit = { bq. bq. val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() bq. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") bq. //create a partitioned table bq. spark.sql("drop table if exists path_leak"); bq. spark.sql("create table if not exists path_leak(id int)" + bq. " partitioned by (gp String)"+ bq. " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ bq. " stored as"+ bq. " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ bq. " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") bq. bq. var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() bq. for (x <- 1 to 2) { bq. seq += (new PathLeakTest(x, "g" + x)) bq. } bq. val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) bq. bq. //insert 50 records to Hive table bq. for (j <- 1 to 50) { bq. val df = spark.createDataFrame(rdd) bq. //#1 InsertIntoHiveTable line 118: add stage path to FileSystem deleteOnExit cache bq. //#2 InsertIntoHiveTable line 385: delete the path from disk but not from the FileSystem cache, and it caused the leak bq. df.write.mode(SaveMode.Overwrite).insertInto("path_leak") bq. } bq. bq. val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) bq. val deleteOnExit = getDeleteOnExit(fs.getClass) bq. deleteOnExit.setAccessible(true) bq. val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] bq. //check FileSystem deleteOnExit cache size bq. println(caches.size()) bq. val it = caches.iterator() bq. //all starge pathes were still cached even they have already been deleted from the disk bq. while(it.hasNext()){ bq. println(it.next()); bq. } bq. } bq. bq. def getDeleteOnExit(cls: Class[_]) : Field = { bq. try{ bq. return cls.getDeclaredField("deleteOnExit") bq. }catch{ bq. case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass) bq. } bq. return null bq. } bq. bq. } > Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable > ---------------------------------------------------------------------- > > Key: SPARK-21721 > URL: https://issues.apache.org/jira/browse/SPARK-21721 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.1 > Reporter: yzheng616 > > The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. > At line 118, it put a staging path to FileSystem delete cache, and then > remove the path from disk at line 385. It does not remove the path from > FileSystem cache. If a streaming application keep persisting data to a > partitioned hive table, the memory will keep increasing until JVM terminated. > Below is a simple code to reproduce it. > package test > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.FileSystem > import org.apache.spark.sql.SaveMode > import java.lang.reflect.Field > case class PathLeakTest(id: Int, gp: String) > object StagePathLeak { > def main(args: Array[String]): Unit = { > val spark = > SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() > spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") > //create a partitioned table > spark.sql("drop table if exists path_leak"); > spark.sql("create table if not exists path_leak(id int)" + > " partitioned by (gp String)"+ > " row format serde > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ > " stored as"+ > " inputformat > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ > " outputformat > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") > var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() > for (x <- 1 to 2) { > seq += (new PathLeakTest(x, "g" + x)) > } > val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) > //insert 50 records to Hive table > for (j <- 1 to 50) { > val df = spark.createDataFrame(rdd) > //#1 InsertIntoHiveTable line 118: add stage path to FileSystem > deleteOnExit cache > //#2 InsertIntoHiveTable line 385: delete the path from disk but not > from the FileSystem cache, and it caused the leak > df.write.mode(SaveMode.Overwrite).insertInto("path_leak") > } > > val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) > val deleteOnExit = getDeleteOnExit(fs.getClass) > deleteOnExit.setAccessible(true) > val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] > //check FileSystem deleteOnExit cache size > println(caches.size()) > val it = caches.iterator() > //all starge pathes were still cached even they have already been deleted > from the disk > while(it.hasNext()){ > println(it.next()); > } > } > > def getDeleteOnExit(cls: Class[_]) : Field = { > try{ > return cls.getDeclaredField("deleteOnExit") > }catch{ > case ex: NoSuchFieldException => return > getDeleteOnExit(cls.getSuperclass) > } > return null > } > } > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org