[ https://issues.apache.org/jira/browse/SPARK-17888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weilin.chen updated SPARK-17888: -------------------------------- Summary: Memory leak in streaming driver when use SparkSQL in Streaming (was: Mseory leak in streaming driver when use SparkSQL in Streaming) > Memory leak in streaming driver when use SparkSQL in Streaming > -------------------------------------------------------------- > > Key: SPARK-17888 > URL: https://issues.apache.org/jira/browse/SPARK-17888 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.2 > Environment: scala 2.10.4 > java 1.7.0_71 > Reporter: weilin.chen > Labels: leak, memory > > Hi > I have a little program of spark 1.5, it receive data from a publisher in > spark streaming. It will process these received data with spark sql. But when > the time goes by I found the memory leak in driver, so i update to spark > 1.6.2. But, there is no change in the situation. > here is the code: > {quote} > val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, > "subresult")) > val jsonf = > lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, > Any]]) > val logs = jsonf.map(data => LogStashV1(data("message").toString, > data("path").toString, data("host").toString, > data("lineno").toString.toDouble, data("timestamp").toString)) > logs.foreachRDD( rdd => { > import sqc.implicits._ > rdd.toDF.registerTempTable("logstash") > val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, > SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND > lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100") > sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, > t(2).toString.toDouble)).collect().foreach(println) > sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, > t(2).toString.toDouble)).collect().foreach(println) > {quote} > jmap information: > {quote} > num #instances #bytes class name > ---------------------------------------------- > 1: 34819 72711952 [B > 2: 2297557 66010656 [C > 3: 2296294 55111056 java.lang.String > 4: 1063491 42539640 org.apache.spark.scheduler.AccumulableInfo > 5: 1251001 40032032 > scala.collection.immutable.HashMap$HashMap1 > 6: 1394364 33464736 java.lang.Long > 7: 1102516 26460384 scala.collection.immutable.$colon$colon > 8: 1058202 25396848 > org.apache.spark.sql.execution.metric.LongSQLMetricValue > 9: 1266499 20263984 scala.Some > 10: 124052 15889104 <methodKlass> > 11: 124052 15269568 <constMethodKlass> > 12: 11350 12082432 <constantPoolKlass> > 13: 11350 11692880 <instanceKlassKlass> > 14: 96682 10828384 org.apache.spark.executor.TaskMetrics > 15: 233481 9505896 [Lscala.collection.immutable.HashMap; > 16: 96682 6961104 org.apache.spark.scheduler.TaskInfo > 17: 9589 6433312 <constantPoolCacheKlass> > 18: 233000 5592000 > scala.collection.immutable.HashMap$HashTrieMap > 19: 96200 5387200 > org.apache.spark.executor.ShuffleReadMetrics > 20: 113381 3628192 scala.collection.mutable.ListBuffer > 21: 7252 2891792 <methodDataKlass> > 22: 117073 2809752 scala.collection.mutable.DefaultEntry > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org