[ https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316050#comment-15316050 ]
Yan Chen commented on SPARK-15716: ---------------------------------- We think the behavior of using local directory is due to this piece of code in RDD class: {code:java} def getCheckpointFile: Option[String] = { checkpointData match { case Some(reliable: ReliableRDDCheckpointData[T]) => reliable.getCheckpointDir case _ => None } } {code} Apparently local directory is not reliable one. > Memory usage of driver keeps growing up in Spark Streaming > ---------------------------------------------------------- > > Key: SPARK-15716 > URL: https://issues.apache.org/jira/browse/SPARK-15716 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.4.1, 1.5.0, 1.6.0, 1.6.1, 2.0.0 > Environment: Oracle Java 1.8.0_51, 1.8.0_85, 1.8.0_91 and 1.8.0_92 > SUSE Linux, CentOS 6 and CentOS 7 > Reporter: Yan Chen > Original Estimate: 48h > Remaining Estimate: 48h > > Code: > {code:java} > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > import org.apache.spark.SparkConf; > import org.apache.spark.SparkContext; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.StreamingContext; > import org.apache.spark.streaming.api.java.JavaPairDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; > public class App { > public static void main(String[] args) { > final String input = args[0]; > final String check = args[1]; > final long interval = Long.parseLong(args[2]); > final SparkConf conf = new SparkConf(); > conf.set("spark.streaming.minRememberDuration", "180s"); > conf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); > conf.set("spark.streaming.unpersist", "true"); > conf.set("spark.streaming.ui.retainedBatches", "10"); > conf.set("spark.ui.retainedJobs", "10"); > conf.set("spark.ui.retainedStages", "10"); > conf.set("spark.worker.ui.retainedExecutors", "10"); > conf.set("spark.worker.ui.retainedDrivers", "10"); > conf.set("spark.sql.ui.retainedExecutions", "10"); > JavaStreamingContextFactory jscf = () -> { > SparkContext sc = new SparkContext(conf); > sc.setCheckpointDir(check); > StreamingContext ssc = new StreamingContext(sc, > Durations.milliseconds(interval)); > JavaStreamingContext jssc = new JavaStreamingContext(ssc); > jssc.checkpoint(check); > // setup pipeline here > JavaPairDStream<LongWritable, Text> inputStream = > jssc.fileStream( > input, > LongWritable.class, > Text.class, > TextInputFormat.class, > (filepath) -> Boolean.TRUE, > false > ); > JavaPairDStream<LongWritable, Text> usbk = inputStream > .updateStateByKey((current, state) -> state); > usbk.checkpoint(Durations.seconds(10)); > usbk.foreachRDD(rdd -> { > rdd.count(); > System.out.println("usbk: " + rdd.toDebugString().split("\n").length); > return null; > }); > return jssc; > }; > JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf); > jssc.start(); > jssc.awaitTermination(); > } > } > {code} > Command used to run the code > {code:none} > spark-submit --keytab [keytab] --principal [principal] --class [package].App > --master yarn --driver-memory 1g --executor-memory 1G --conf > "spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf > "spark.executor.instances=2" --conf > "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC > -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" --conf > "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log > -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive] > [dir-on-hdfs] 200 > {code} > It's a very simple piece of code, when I ran it, the memory usage of driver > keeps going up. There is no file input in our runs. Batch interval is set to > 200 milliseconds; processing time for each batch is below 150 milliseconds, > while most of which are below 70 milliseconds. > !http://i.imgur.com/uSzUui6.png! > The right most four red triangles are full GC's which are triggered manually > by using "jcmd pid GC.run" command. > I also did more experiments in the second and third comment I posted. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org