[ https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329990#comment-15329990 ]
Yan Chen edited comment on SPARK-15716 at 6/14/16 5:52 PM: ----------------------------------------------------------- !http://i.imgur.com/gcm4Y6p.png! I have difficulties uploading big files from company. If you really want the heap dump file, I can try to find a way to upload it. This one is run on Spark 1.4.1 community version. Behavior is same as before: memory usage keep going up. Log shows: {code} Exception in thread "JobGenerator" java.lang.OutOfMemoryError: GC overhead limit exceeded {code} after 30 minutes. Parameters: * Driver memory: 500M * Executor memory: 500M * # of executors: 2 * batch interval: 100 ms * no file is put in the input dir Code: {code:java} import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming._ import org.apache.log4j.{Level, Logger} object StatefulNetworkWordCount { def main(args: Array[String]) { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { Logger.getRootLogger.setLevel(Level.WARN) } val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf() .setAppName("StatefulNetworkWordCount") sparkConf.set("spark.streaming.minRememberDuration", "180s") sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") sparkConf.set("spark.streaming.unpersist", "true") sparkConf.set("spark.streaming.ui.retainedBatches", "10") sparkConf.set("spark.ui.retainedJobs", "10") sparkConf.set("spark.ui.retainedStages", "10") sparkConf.set("spark.worker.ui.retainedExecutors", "10") sparkConf.set("spark.worker.ui.retainedDrivers", "10") sparkConf.set("spark.sql.ui.retainedExecutions", "10") sparkConf.set("spark.cleaner.ttl", "240s") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Milliseconds(args(2).toLong)) ssc.checkpoint(args(1)) // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.textFileStream(args(0))//ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } } {code} was (Author: yani.chen): !http://i.imgur.com/gcm4Y6p.png! I have difficulties uploading big files from company. If you really want the heap dump file, I can try to find a way to upload it. This one is run on Spark 1.4.1 community version. Behavior is same as before: memory usage keep going up. Log shows: {code} Exception in thread "JobGenerator" java.lang.OutOfMemoryError: GC overhead limit exceeded {code} after 30 minutes. Parameters: * Driver memory: 500M * Executor memory: 500M * # of executors: 2 * no file is put in the input dir Code: {code:java} import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming._ import org.apache.log4j.{Level, Logger} object StatefulNetworkWordCount { def main(args: Array[String]) { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { Logger.getRootLogger.setLevel(Level.WARN) } val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf() .setAppName("StatefulNetworkWordCount") sparkConf.set("spark.streaming.minRememberDuration", "180s") sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") sparkConf.set("spark.streaming.unpersist", "true") sparkConf.set("spark.streaming.ui.retainedBatches", "10") sparkConf.set("spark.ui.retainedJobs", "10") sparkConf.set("spark.ui.retainedStages", "10") sparkConf.set("spark.worker.ui.retainedExecutors", "10") sparkConf.set("spark.worker.ui.retainedDrivers", "10") sparkConf.set("spark.sql.ui.retainedExecutions", "10") sparkConf.set("spark.cleaner.ttl", "240s") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Milliseconds(args(2).toLong)) ssc.checkpoint(args(1)) // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.textFileStream(args(0))//ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } } {code} > Memory usage of driver keeps growing up in Spark Streaming > ---------------------------------------------------------- > > Key: SPARK-15716 > URL: https://issues.apache.org/jira/browse/SPARK-15716 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.4.1, 1.5.0, 1.6.0, 1.6.1, 2.0.0 > Environment: Oracle Java 1.8.0_51, 1.8.0_85, 1.8.0_91 and 1.8.0_92 > SUSE Linux, CentOS 6 and CentOS 7 > Reporter: Yan Chen > Original Estimate: 48h > Remaining Estimate: 48h > > Code: > {code:java} > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > import org.apache.spark.SparkConf; > import org.apache.spark.SparkContext; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.StreamingContext; > import org.apache.spark.streaming.api.java.JavaPairDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; > public class App { > public static void main(String[] args) { > final String input = args[0]; > final String check = args[1]; > final long interval = Long.parseLong(args[2]); > final SparkConf conf = new SparkConf(); > conf.set("spark.streaming.minRememberDuration", "180s"); > conf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); > conf.set("spark.streaming.unpersist", "true"); > conf.set("spark.streaming.ui.retainedBatches", "10"); > conf.set("spark.ui.retainedJobs", "10"); > conf.set("spark.ui.retainedStages", "10"); > conf.set("spark.worker.ui.retainedExecutors", "10"); > conf.set("spark.worker.ui.retainedDrivers", "10"); > conf.set("spark.sql.ui.retainedExecutions", "10"); > JavaStreamingContextFactory jscf = () -> { > SparkContext sc = new SparkContext(conf); > sc.setCheckpointDir(check); > StreamingContext ssc = new StreamingContext(sc, > Durations.milliseconds(interval)); > JavaStreamingContext jssc = new JavaStreamingContext(ssc); > jssc.checkpoint(check); > // setup pipeline here > JavaPairDStream<LongWritable, Text> inputStream = > jssc.fileStream( > input, > LongWritable.class, > Text.class, > TextInputFormat.class, > (filepath) -> Boolean.TRUE, > false > ); > JavaPairDStream<LongWritable, Text> usbk = inputStream > .updateStateByKey((current, state) -> state); > usbk.checkpoint(Durations.seconds(10)); > usbk.foreachRDD(rdd -> { > rdd.count(); > System.out.println("usbk: " + rdd.toDebugString().split("\n").length); > return null; > }); > return jssc; > }; > JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf); > jssc.start(); > jssc.awaitTermination(); > } > } > {code} > Command used to run the code > {code:none} > spark-submit --keytab [keytab] --principal [principal] --class [package].App > --master yarn --driver-memory 1g --executor-memory 1G --conf > "spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf > "spark.executor.instances=2" --conf > "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC > -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" --conf > "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log > -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive] > [dir-on-hdfs] 200 > {code} > It's a very simple piece of code, when I ran it, the memory usage of driver > keeps going up. There is no file input in our runs. Batch interval is set to > 200 milliseconds; processing time for each batch is below 150 milliseconds, > while most of which are below 70 milliseconds. > !http://i.imgur.com/uSzUui6.png! > The right most four red triangles are full GC's which are triggered manually > by using "jcmd pid GC.run" command. > I also did more experiments in the second and third comment I posted. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org