[ 
https://issues.apache.org/jira/browse/SPARK-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dylanzhou reopened SPARK-13183:
-------------------------------

确实存在内存泄露问题,最后堆内存会耗尽,报错java.lang.OutOfMemoryError: Java heap 
space。当我尝试增大driver内存,只是streaming程序正常执行的时间会长一点,在我看来byte[]对象无法被gc回收。这里是我的程序,麻烦指点一下是否是我的程序问题,谢谢!
object LogAnalyzerStreamingSQL { 


      def main(args: Array[String]) { 
        val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in 
Scala") 
        val sc = new SparkContext(sparkConf) 

        val sqlContext = new SQLContext(sc) 
        import sqlContext.implicits._ 

        val ssc = new StreamingContext(sc, 30) 
        val topicSet = Set("applogs") 
        val kafkaParams = Map[String, String]( 
        "metadata.broker.list" -> 
"192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9092", 
        "group.id" -> "app_group", 
        "serializer.class" -> "kafka.serializer.StringEncoder") 
        val kafkaStream= KafkaUtils.createDirectStream(ssc,kafkaParams,topics) 
         kafkaStream.foreachRDD(rdd => { 
            if (!rdd.isEmpty()) { 
              val jsonRdd=rdd.map(x=>x._2) 
              val df = sqlContext.read.json(jsonRdd) 
              df.registerTempTable("applogs") 
              sqlContext.cacheTable("applogs") 

            // Calculate statistics based on the content size. 
            val contentSizeStats = sqlContext 
              .sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), 
MAX(contentSize) FROM applogs") 
             .show() 


            // Compute Response Code to Count. 
            val responseCodeToCount = sqlContext 
              .sql("SELECT responseCode, COUNT(*) FROM applogs GROUP BY 
responseCode") 
              .map(row => (row.getInt(0), row.getLong(1))) 
              .show() 


            // Any IPAddress that has accessed the server more than 10 times. 
            val ipAddresses =sqlContext 
              .sql("SELECT ipAddress, COUNT(*) AS total FROM applogs GROUP BY 
ipAddress HAVING total > 10") 
              .map(row => row.getString(0)) 
              .take(100) 


            val topEndpoints = sqlContext 
              .sql("SELECT endpoint, COUNT(*) AS total FROM applogs GROUP BY 
endpoint ORDER BY total DESC LIMIT 10") 
              .map(row => (row.getString(0), row.getLong(1))) 
              .show() 

              //....a lot of sql like that 

              sqlContext.uncacheTable("applogs") 
          } 
        }) 

        ssc.start() 
        ssc.awaitTermination() 
      } 
    } 

> Bytebuffers occupy a large amount of heap memory
> ------------------------------------------------
>
>                 Key: SPARK-13183
>                 URL: https://issues.apache.org/jira/browse/SPARK-13183
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>            Reporter: dylanzhou
>
> When I used sparkstreamimg and sparksql, i cache the table,found that old gen 
> increases very fast and full GC is very frequent, running for a period of 
> time will be out of memory, after analysis of heap memory, found that there 
> are a large number of org.apache.spark.sql.columnar.ColumnBuilder[38] @ 
> 0xd022a0b8, takes up 90% of the space, look at the source is HeapByteBuffer 
> occupy, don't know why these objects are not released, had been waiting for 
> GC to recycle;if i donot use cache table, there will be no this problem, but 
> I need to repeatedly query this table do



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to