[ https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yzheng616 updated SPARK-21721: ------------------------------ Description: The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated. Below is a simple code to reproduce it. {{package test import org.apache.spark.sql.SparkSession import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.SaveMode import java.lang.reflect.Field case class PathLeakTest(id: Int, gp: String) object StagePathLeak { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") //create a partitioned table spark.sql("drop table if exists path_leak"); spark.sql("create table if not exists path_leak(id int)" + " partitioned by (gp String)"+ " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ " stored as"+ " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() for (x <- 1 to 2) { seq += (new PathLeakTest(x, "g" + x)) } val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) //insert 50 records to Hive table for (j <- 1 to 50) { val df = spark.createDataFrame(rdd) //#1 InsertIntoHiveTable line 118: add stage path to FileSystem deleteOnExit cache //#2 InsertIntoHiveTable line 385: delete the path from disk but not from the FileSystem cache, and it caused the leak df.write.mode(SaveMode.Overwrite).insertInto("path_leak") } val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) val deleteOnExit = getDeleteOnExit(fs.getClass) deleteOnExit.setAccessible(true) val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] //check FileSystem deleteOnExit cache size println(caches.size()) val it = caches.iterator() //all starge pathes were still cached even they have already been deleted from the disk while(it.hasNext()){ println(it.next()); } } def getDeleteOnExit(cls: Class[_]) : Field = { try{ return cls.getDeclaredField("deleteOnExit") }catch{ case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass) } return null } }}} was: The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated. Below is a simple code to reproduce it. {quote}package test import org.apache.spark.sql.SparkSession import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.SaveMode import java.lang.reflect.Field case class PathLeakTest(id: Int, gp: String) object StagePathLeak { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") //create a partitioned table spark.sql("drop table if exists path_leak"); spark.sql("create table if not exists path_leak(id int)" + " partitioned by (gp String)"+ " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ " stored as"+ " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() for (x <- 1 to 2) { seq += (new PathLeakTest(x, "g" + x)) } val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) //insert 50 records to Hive table for (j <- 1 to 50) { val df = spark.createDataFrame(rdd) //#1 InsertIntoHiveTable line 118: add stage path to FileSystem deleteOnExit cache //#2 InsertIntoHiveTable line 385: delete the path from disk but not from the FileSystem cache, and it caused the leak df.write.mode(SaveMode.Overwrite).insertInto("path_leak") } val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) val deleteOnExit = getDeleteOnExit(fs.getClass) deleteOnExit.setAccessible(true) val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] //check FileSystem deleteOnExit cache size println(caches.size()) val it = caches.iterator() //all starge pathes were still cached even they have already been deleted from the disk while(it.hasNext()){ println(it.next()); } } def getDeleteOnExit(cls: Class[_]) : Field = { try{ return cls.getDeclaredField("deleteOnExit") }catch{ case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass) } return null } }{quote} > Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable > ---------------------------------------------------------------------- > > Key: SPARK-21721 > URL: https://issues.apache.org/jira/browse/SPARK-21721 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.1 > Reporter: yzheng616 > > The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. > At line 118, it put a staging path to FileSystem delete cache, and then > remove the path from disk at line 385. It does not remove the path from > FileSystem cache. If a streaming application keep persisting data to a > partitioned hive table, the memory will keep increasing until JVM terminated. > Below is a simple code to reproduce it. > {{package test > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.FileSystem > import org.apache.spark.sql.SaveMode > import java.lang.reflect.Field > case class PathLeakTest(id: Int, gp: String) > object StagePathLeak { > def main(args: Array[String]): Unit = { > val spark = > SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate() > spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") > //create a partitioned table > spark.sql("drop table if exists path_leak"); > spark.sql("create table if not exists path_leak(id int)" + > " partitioned by (gp String)"+ > " row format serde > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ > " stored as"+ > " inputformat > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ > " outputformat > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") > var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]() > for (x <- 1 to 2) { > seq += (new PathLeakTest(x, "g" + x)) > } > val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq) > //insert 50 records to Hive table > for (j <- 1 to 50) { > val df = spark.createDataFrame(rdd) > //#1 InsertIntoHiveTable line 118: add stage path to FileSystem > deleteOnExit cache > //#2 InsertIntoHiveTable line 385: delete the path from disk but not > from the FileSystem cache, and it caused the leak > df.write.mode(SaveMode.Overwrite).insertInto("path_leak") > } > > val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) > val deleteOnExit = getDeleteOnExit(fs.getClass) > deleteOnExit.setAccessible(true) > val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]] > //check FileSystem deleteOnExit cache size > println(caches.size()) > val it = caches.iterator() > //all starge pathes were still cached even they have already been deleted > from the disk > while(it.hasNext()){ > println(it.next()); > } > } > > def getDeleteOnExit(cls: Class[_]) : Field = { > try{ > return cls.getDeclaredField("deleteOnExit") > }catch{ > case ex: NoSuchFieldException => return > getDeleteOnExit(cls.getSuperclass) > } > return null > } > }}} > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org