[ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yzheng616 updated SPARK-21721:
------------------------------
    Description: 
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.
{code:java}
package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

    val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    //create a partitioned table
    spark.sql("drop table if exists path_leak");
    spark.sql("create table if not exists path_leak(id int)" +
        " partitioned by (gp String)"+
      " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
      " stored as"+
        " inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
        " outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

    var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
    // 2 partitions
    for (x <- 1 to 2) {
      seq += (new PathLeakTest(x, "g" + x))
    }
    val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

    //insert 50 records to Hive table
    for (j <- 1 to 50) {
      val df = spark.createDataFrame(rdd)

      //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

      //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
      df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
    }
    
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val deleteOnExit = getDeleteOnExit(fs.getClass)
    deleteOnExit.setAccessible(true)
    val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
    //check FileSystem deleteOnExit cache size
    println(caches.size())
    val it = caches.iterator()
    //all starge pathes were still cached even they have already been deleted 
from the disk
    while(it.hasNext()){
      println(it.next());
    }
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
    try{
       return cls.getDeclaredField("deleteOnExit")
    }catch{
      case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
    }
    return null
  }

}
{code}

 


 

  was:
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At 
line 118, it put a staging path to FileSystem delete cache, and then remove the 
path from disk at line 385. It does not remove the path from FileSystem cache. 
If a streaming application keep persisting data to a partitioned hive table, 
the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

    val spark = 
SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    //create a partitioned table
    spark.sql("drop table if exists path_leak");
    spark.sql("create table if not exists path_leak(id int)" +
        " partitioned by (gp String)"+
      " row format serde 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
      " stored as"+
        " inputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
        " outputformat 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

    var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
    // 2 partitions
    for (x <- 1 to 2) {
      seq += (new PathLeakTest(x, "g" + x))
    }
    val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

    //insert 50 records to Hive table
    for (j <- 1 to 50) {
      val df = spark.createDataFrame(rdd)

      //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
deleteOnExit cache

      //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
from the FileSystem cache, and it caused the leak
      df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
    }
    
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val deleteOnExit = getDeleteOnExit(fs.getClass)
    deleteOnExit.setAccessible(true)
    val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
    //check FileSystem deleteOnExit cache size
    println(caches.size())
    val it = caches.iterator()
    //all starge pathes were still cached even they have already been deleted 
from the disk
    while(it.hasNext()){
      println(it.next());
    }
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
    try{
       return cls.getDeclaredField("deleteOnExit")
    }catch{
      case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
    }
    return null
  }

}


 


 


> Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21721
>                 URL: https://issues.apache.org/jira/browse/SPARK-21721
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: yzheng616
>
> The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. 
> At line 118, it put a staging path to FileSystem delete cache, and then 
> remove the path from disk at line 385. It does not remove the path from 
> FileSystem cache. If a streaming application keep persisting data to a 
> partitioned hive table, the memory will keep increasing until JVM terminated.
> Below is a simple code to reproduce it.
> {code:java}
> package test
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.FileSystem
> import org.apache.spark.sql.SaveMode
> import java.lang.reflect.Field
> case class PathLeakTest(id: Int, gp: String)
> object StagePathLeak {
>   def main(args: Array[String]): Unit = {
>     val spark = 
> SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
>     spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
>     //create a partitioned table
>     spark.sql("drop table if exists path_leak");
>     spark.sql("create table if not exists path_leak(id int)" +
>         " partitioned by (gp String)"+
>       " row format serde 
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>       " stored as"+
>         " inputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
>         " outputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
>     var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
>     // 2 partitions
>     for (x <- 1 to 2) {
>       seq += (new PathLeakTest(x, "g" + x))
>     }
>     val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
>     //insert 50 records to Hive table
>     for (j <- 1 to 50) {
>       val df = spark.createDataFrame(rdd)
>       //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
> deleteOnExit cache
>       //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
> from the FileSystem cache, and it caused the leak
>       df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
>     }
>     
>     val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>     val deleteOnExit = getDeleteOnExit(fs.getClass)
>     deleteOnExit.setAccessible(true)
>     val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
>     //check FileSystem deleteOnExit cache size
>     println(caches.size())
>     val it = caches.iterator()
>     //all starge pathes were still cached even they have already been deleted 
> from the disk
>     while(it.hasNext()){
>       println(it.next());
>     }
>   }
>   
>   def getDeleteOnExit(cls: Class[_]) : Field = {
>     try{
>        return cls.getDeclaredField("deleteOnExit")
>     }catch{
>       case ex: NoSuchFieldException => return 
> getDeleteOnExit(cls.getSuperclass)
>     }
>     return null
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to