[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-14 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-21721:

Affects Version/s: 2.0.2
   2.2.0

> Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
> --
>
> Key: SPARK-21721
> URL: https://issues.apache.org/jira/browse/SPARK-21721
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: yzheng616
>Priority: Critical
>
> The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. 
> At line 118, it put a staging path to FileSystem delete cache, and then 
> remove the path from disk at line 385. It does not remove the path from 
> FileSystem cache. If a streaming application keep persisting data to a 
> partitioned hive table, the memory will keep increasing until JVM terminated.
> Below is a simple code to reproduce it.
> {code:java}
> package test
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.FileSystem
> import org.apache.spark.sql.SaveMode
> import java.lang.reflect.Field
> case class PathLeakTest(id: Int, gp: String)
> object StagePathLeak {
>   def main(args: Array[String]): Unit = {
> val spark = 
> SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> //create a partitioned table
> spark.sql("drop table if exists path_leak");
> spark.sql("create table if not exists path_leak(id int)" +
> " partitioned by (gp String)"+
>   " row format serde 
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>   " stored as"+
> " inputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
> " outputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
> var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
> // 2 partitions
> for (x <- 1 to 2) {
>   seq += (new PathLeakTest(x, "g" + x))
> }
> val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
> //insert 50 records to Hive table
> for (j <- 1 to 50) {
>   val df = spark.createDataFrame(rdd)
>   //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
> deleteOnExit cache
>   //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
> from the FileSystem cache, and it caused the leak
>   df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
> }
> 
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
> val deleteOnExit = getDeleteOnExit(fs.getClass)
> deleteOnExit.setAccessible(true)
> val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
> //check FileSystem deleteOnExit cache size
> println(caches.size())
> val it = caches.iterator()
> //all starge pathes were still cached even they have already been deleted 
> from the disk
> while(it.hasNext()){
>   println(it.next());
> }
>   }
>   
>   def getDeleteOnExit(cls: Class[_]) : Field = {
> try{
>return cls.getDeclaredField("deleteOnExit")
> }catch{
>   case ex: NoSuchFieldException => return 
> getDeleteOnExit(cls.getSuperclass)
> }
> return null
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-14 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-21721:

Priority: Critical  (was: Major)

> Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
> --
>
> Key: SPARK-21721
> URL: https://issues.apache.org/jira/browse/SPARK-21721
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: yzheng616
>Priority: Critical
>
> The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. 
> At line 118, it put a staging path to FileSystem delete cache, and then 
> remove the path from disk at line 385. It does not remove the path from 
> FileSystem cache. If a streaming application keep persisting data to a 
> partitioned hive table, the memory will keep increasing until JVM terminated.
> Below is a simple code to reproduce it.
> {code:java}
> package test
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.FileSystem
> import org.apache.spark.sql.SaveMode
> import java.lang.reflect.Field
> case class PathLeakTest(id: Int, gp: String)
> object StagePathLeak {
>   def main(args: Array[String]): Unit = {
> val spark = 
> SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> //create a partitioned table
> spark.sql("drop table if exists path_leak");
> spark.sql("create table if not exists path_leak(id int)" +
> " partitioned by (gp String)"+
>   " row format serde 
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>   " stored as"+
> " inputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
> " outputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
> var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
> // 2 partitions
> for (x <- 1 to 2) {
>   seq += (new PathLeakTest(x, "g" + x))
> }
> val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
> //insert 50 records to Hive table
> for (j <- 1 to 50) {
>   val df = spark.createDataFrame(rdd)
>   //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
> deleteOnExit cache
>   //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
> from the FileSystem cache, and it caused the leak
>   df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
> }
> 
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
> val deleteOnExit = getDeleteOnExit(fs.getClass)
> deleteOnExit.setAccessible(true)
> val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
> //check FileSystem deleteOnExit cache size
> println(caches.size())
> val it = caches.iterator()
> //all starge pathes were still cached even they have already been deleted 
> from the disk
> while(it.hasNext()){
>   println(it.next());
> }
>   }
>   
>   def getDeleteOnExit(cls: Class[_]) : Field = {
> try{
>return cls.getDeclaredField("deleteOnExit")
> }catch{
>   case ex: NoSuchFieldException => return 
> getDeleteOnExit(cls.getSuperclass)
> }
> return null
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.
{code:java}
package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
// 2 partitions
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}
{code}

 


 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.
{code:java}
package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
// 2 partitions
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)

  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
   

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.
{code:java}
package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
// 2 partitions
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)

  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}
{code}

 


 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
// 2 partitions
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)

  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

 

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
// 2 partitions
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)

  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}


 


 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = 

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}


 


 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

bq. package test
bq. 
bq. import org.apache.spark.sql.SparkSession
bq. import org.apache.hadoop.fs.Path
bq. import org.apache.hadoop.fs.FileSystem
bq. import org.apache.spark.sql.SaveMode
bq. import java.lang.reflect.Field
bq. 
bq. 
bq. 
bq. case class PathLeakTest(id: Int, gp: String)
bq. 
bq. object StagePathLeak {
bq. 
bq.   def main(args: Array[String]): Unit = {
bq. 
bq. val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
bq. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
bq. //create a partitioned table
bq. spark.sql("drop table if exists path_leak");
bq. spark.sql("create table if not exists path_leak(id int)" +
bq. " partitioned by (gp String)"+
bq.   " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
bq.   " stored as"+
bq. " inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
bq. " outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
bq. 
bq. var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
bq. for (x <- 1 to 2) {
bq.   seq += (new PathLeakTest(x, "g" + x))
bq. }
bq. val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
bq. 
bq. //insert 50 records to Hive table
bq. for (j <- 1 to 50) {
bq.   val df = spark.createDataFrame(rdd)
bq.   //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
bq.   //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it 

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

bq. package test
bq. 
bq. import org.apache.spark.sql.SparkSession
bq. import org.apache.hadoop.fs.Path
bq. import org.apache.hadoop.fs.FileSystem
bq. import org.apache.spark.sql.SaveMode
bq. import java.lang.reflect.Field
bq. 
bq. 
bq. 
bq. case class PathLeakTest(id: Int, gp: String)
bq. 
bq. object StagePathLeak {
bq. 
bq.   def main(args: Array[String]): Unit = {
bq. 
bq. val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
bq. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
bq. //create a partitioned table
bq. spark.sql("drop table if exists path_leak");
bq. spark.sql("create table if not exists path_leak(id int)" +
bq. " partitioned by (gp String)"+
bq.   " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
bq.   " stored as"+
bq. " inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
bq. " outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
bq. 
bq. var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
bq. for (x <- 1 to 2) {
bq.   seq += (new PathLeakTest(x, "g" + x))
bq. }
bq. val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
bq. 
bq. //insert 50 records to Hive table
bq. for (j <- 1 to 50) {
bq.   val df = spark.createDataFrame(rdd)
bq.   //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
bq.   //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
bq.   df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
bq. }
bq. 
bq. val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
bq. val deleteOnExit = getDeleteOnExit(fs.getClass)
bq. deleteOnExit.setAccessible(true)
bq. val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
bq. //check FileSystem deleteOnExit cache size
bq. println(caches.size())
bq. val it = caches.iterator()
bq. //all starge pathes were still cached even they have already been 
deleted from the disk
bq. while(it.hasNext()){
bq.   println(it.next());
bq. }
bq.   }
bq.   
bq.   def getDeleteOnExit(cls: Class[_]) : Field = {
bq. try{
bq.return cls.getDeclaredField("deleteOnExit")
bq. }catch{
bq.   case ex: NoSuchFieldException => return 
getDeleteOnExit(cls.getSuperclass)
bq. }
bq. return null
bq.   }
bq. 
bq. }



 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

{{package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

{{package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}}}



 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

{quote}package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = 

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

{quote}package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}{quote}



 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = 

[jira] [Updated] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

2017-08-13 Thread yzheng616 (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
--
Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 50) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted 
from the disk
while(it.hasNext()){
  println(it.next());
}
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
try{
   return cls.getDeclaredField("deleteOnExit")
}catch{
  case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
  }

}



 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
  " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
  " stored as"+
" inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
  seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

//insert 50 records to Hive table
for (j <- 1 to 2) {
  val df = spark.createDataFrame(rdd)
  //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache
  //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
  df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
}

val fs =