[ 
https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316045#comment-15316045
 ] 

Yan Chen edited comment on SPARK-15716 at 6/5/16 10:11 PM:
-----------------------------------------------------------

The same thing happens on 1.5.0, 1.6.0, 1.6.1, 2.0.0 preview. However, if I set 
the checkpointing directory to local one when only using one node as both 
master and worker, the issue goes aways. The behavior is the same for all the 
versions, regardless of which environment it's running. We already tried 
standalone and yarn-client mode. 


was (Author: yani.chen):
The same thing happens on 1.6.0, 1.6.1, 2.0.0 preview. However, if I set the 
checkpointing directory to local one when only using one node as both master 
and worker, the issue goes aways. The behavior is the same for all the 
versions, regardless of which environment it's running. We already tried 
standalone and yarn-client mode. 

> Memory usage of driver keeps growing up in Spark Streaming
> ----------------------------------------------------------
>
>                 Key: SPARK-15716
>                 URL: https://issues.apache.org/jira/browse/SPARK-15716
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.1, 1.5.0, 1.6.0, 1.6.1, 2.0.0
>         Environment: Oracle Java 1.8.0_51, 1.8.0_85, 1.8.0_91 and 1.8.0_92
> SUSE Linux, CentOS 6 and CentOS 7
>            Reporter: Yan Chen
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Code:
> {code:java}
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.SparkContext;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.StreamingContext;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class App {
>   public static void main(String[] args) {
>     final String input = args[0];
>     final String check = args[1];
>     final long interval = Long.parseLong(args[2]);
>     final SparkConf conf = new SparkConf();
>     conf.set("spark.streaming.minRememberDuration", "180s");
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
>     conf.set("spark.streaming.unpersist", "true");
>     conf.set("spark.streaming.ui.retainedBatches", "10");
>     conf.set("spark.ui.retainedJobs", "10");
>     conf.set("spark.ui.retainedStages", "10");
>     conf.set("spark.worker.ui.retainedExecutors", "10");
>     conf.set("spark.worker.ui.retainedDrivers", "10");
>     conf.set("spark.sql.ui.retainedExecutions", "10");
>     JavaStreamingContextFactory jscf = () -> {
>       SparkContext sc = new SparkContext(conf);
>       sc.setCheckpointDir(check);
>       StreamingContext ssc = new StreamingContext(sc, 
> Durations.milliseconds(interval));
>       JavaStreamingContext jssc = new JavaStreamingContext(ssc);
>       jssc.checkpoint(check);
>       // setup pipeline here
>       JavaPairDStream<LongWritable, Text> inputStream =
>           jssc.fileStream(
>               input,
>               LongWritable.class,
>               Text.class,
>               TextInputFormat.class,
>               (filepath) -> Boolean.TRUE,
>               false
>           );
>       JavaPairDStream<LongWritable, Text> usbk = inputStream
>           .updateStateByKey((current, state) -> state);
>       usbk.checkpoint(Durations.seconds(10));
>       usbk.foreachRDD(rdd -> {
>         rdd.count();
>         System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
>         return null;
>       });
>       return jssc;
>     };
>     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
>     jssc.start();
>     jssc.awaitTermination();
>   }
> }
> {code}
> Command used to run the code
> {code:none}
> spark-submit --keytab [keytab] --principal [principal] --class [package].App 
> --master yarn --driver-memory 1g --executor-memory 1G --conf 
> "spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf 
> "spark.executor.instances=2" --conf 
> "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC 
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" --conf 
> "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log 
> -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
> -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive] 
> [dir-on-hdfs] 200
> {code}
> It's a very simple piece of code, when I ran it, the memory usage of driver 
> keeps going up. There is no file input in our runs. Batch interval is set to 
> 200 milliseconds; processing time for each batch is below 150 milliseconds, 
> while most of which are below 70 milliseconds.
> !http://i.imgur.com/uSzUui6.png!
> The right most four red triangles are full GC's which are triggered manually 
> by using "jcmd pid GC.run" command.
> I also did more experiments in the second and third comment I posted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to