[ 
https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329990#comment-15329990
 ] 

Yan Chen edited comment on SPARK-15716 at 6/14/16 5:45 PM:
-----------------------------------------------------------

!http://i.imgur.com/gcm4Y6p.png!

I have difficulties uploading big files from company. If you really want the 
heap dump file, I can try to find a way to upload it.

This one is run on Spark 1.4.1 community version. Behavior is same as before: 
memory usage keep going up.

Log shows:

{code}
Exception in thread "JobGenerator" java.lang.OutOfMemoryError: GC overhead 
limit exceeded
{code}

after 30 minutes.

Parameters:
* Driver memory: 500M
* Executor memory: 500M
* # of executors: 2
* no file is put in the input dir

Code:

{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

import org.apache.log4j.{Level, Logger}

object StatefulNetworkWordCount {
  def main(args: Array[String]) {

    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      Logger.getRootLogger.setLevel(Level.WARN)
    }

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) 
=> {
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
    }

    val sparkConf = new SparkConf()
      .setAppName("StatefulNetworkWordCount")
    sparkConf.set("spark.streaming.minRememberDuration", "180s")
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    sparkConf.set("spark.streaming.unpersist", "true")
    sparkConf.set("spark.streaming.ui.retainedBatches", "10")
    sparkConf.set("spark.ui.retainedJobs", "10")
    sparkConf.set("spark.ui.retainedStages", "10")
    sparkConf.set("spark.worker.ui.retainedExecutors", "10")
    sparkConf.set("spark.worker.ui.retainedDrivers", "10")
    sparkConf.set("spark.sql.ui.retainedExecutions", "10")
    sparkConf.set("spark.cleaner.ttl", "240s")

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Milliseconds(args(2).toLong))
    ssc.checkpoint(args(1))

    // Initial RDD input to updateStateByKey
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
1)))

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.textFileStream(args(0))//ssc.socketTextStream(args(0), 
args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of 
the words)
    val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
      new HashPartitioner (ssc.sparkContext.defaultParallelism), true, 
initialRDD)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
{code}


was (Author: yani.chen):
!http://i.imgur.com/gcm4Y6p.png!

I have difficulties uploading files from company. If you really want the heap 
dump file, I can try to find a way to upload it.

This one is run on Spark 1.4.1 community version. Behavior is same as before: 
memory usage keep going up.

Log shows:

{code}
Exception in thread "JobGenerator" java.lang.OutOfMemoryError: GC overhead 
limit exceeded
{code}

after 30 minutes.

Parameters:
* Driver memory: 500M
* Executor memory: 500M
* # of executors: 2
* no file is put in the input dir

Code:

{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

import org.apache.log4j.{Level, Logger}

object StatefulNetworkWordCount {
  def main(args: Array[String]) {

    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      Logger.getRootLogger.setLevel(Level.WARN)
    }

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) 
=> {
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
    }

    val sparkConf = new SparkConf()
      .setAppName("StatefulNetworkWordCount")
    sparkConf.set("spark.streaming.minRememberDuration", "180s")
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    sparkConf.set("spark.streaming.unpersist", "true")
    sparkConf.set("spark.streaming.ui.retainedBatches", "10")
    sparkConf.set("spark.ui.retainedJobs", "10")
    sparkConf.set("spark.ui.retainedStages", "10")
    sparkConf.set("spark.worker.ui.retainedExecutors", "10")
    sparkConf.set("spark.worker.ui.retainedDrivers", "10")
    sparkConf.set("spark.sql.ui.retainedExecutions", "10")
    sparkConf.set("spark.cleaner.ttl", "240s")

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Milliseconds(args(2).toLong))
    ssc.checkpoint(args(1))

    // Initial RDD input to updateStateByKey
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
1)))

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.textFileStream(args(0))//ssc.socketTextStream(args(0), 
args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of 
the words)
    val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
      new HashPartitioner (ssc.sparkContext.defaultParallelism), true, 
initialRDD)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
{code}

> Memory usage of driver keeps growing up in Spark Streaming
> ----------------------------------------------------------
>
>                 Key: SPARK-15716
>                 URL: https://issues.apache.org/jira/browse/SPARK-15716
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.1, 1.5.0, 1.6.0, 1.6.1, 2.0.0
>         Environment: Oracle Java 1.8.0_51, 1.8.0_85, 1.8.0_91 and 1.8.0_92
> SUSE Linux, CentOS 6 and CentOS 7
>            Reporter: Yan Chen
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Code:
> {code:java}
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.SparkContext;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.StreamingContext;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class App {
>   public static void main(String[] args) {
>     final String input = args[0];
>     final String check = args[1];
>     final long interval = Long.parseLong(args[2]);
>     final SparkConf conf = new SparkConf();
>     conf.set("spark.streaming.minRememberDuration", "180s");
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
>     conf.set("spark.streaming.unpersist", "true");
>     conf.set("spark.streaming.ui.retainedBatches", "10");
>     conf.set("spark.ui.retainedJobs", "10");
>     conf.set("spark.ui.retainedStages", "10");
>     conf.set("spark.worker.ui.retainedExecutors", "10");
>     conf.set("spark.worker.ui.retainedDrivers", "10");
>     conf.set("spark.sql.ui.retainedExecutions", "10");
>     JavaStreamingContextFactory jscf = () -> {
>       SparkContext sc = new SparkContext(conf);
>       sc.setCheckpointDir(check);
>       StreamingContext ssc = new StreamingContext(sc, 
> Durations.milliseconds(interval));
>       JavaStreamingContext jssc = new JavaStreamingContext(ssc);
>       jssc.checkpoint(check);
>       // setup pipeline here
>       JavaPairDStream<LongWritable, Text> inputStream =
>           jssc.fileStream(
>               input,
>               LongWritable.class,
>               Text.class,
>               TextInputFormat.class,
>               (filepath) -> Boolean.TRUE,
>               false
>           );
>       JavaPairDStream<LongWritable, Text> usbk = inputStream
>           .updateStateByKey((current, state) -> state);
>       usbk.checkpoint(Durations.seconds(10));
>       usbk.foreachRDD(rdd -> {
>         rdd.count();
>         System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
>         return null;
>       });
>       return jssc;
>     };
>     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
>     jssc.start();
>     jssc.awaitTermination();
>   }
> }
> {code}
> Command used to run the code
> {code:none}
> spark-submit --keytab [keytab] --principal [principal] --class [package].App 
> --master yarn --driver-memory 1g --executor-memory 1G --conf 
> "spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf 
> "spark.executor.instances=2" --conf 
> "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC 
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" --conf 
> "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log 
> -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
> -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive] 
> [dir-on-hdfs] 200
> {code}
> It's a very simple piece of code, when I ran it, the memory usage of driver 
> keeps going up. There is no file input in our runs. Batch interval is set to 
> 200 milliseconds; processing time for each batch is below 150 milliseconds, 
> while most of which are below 70 milliseconds.
> !http://i.imgur.com/uSzUui6.png!
> The right most four red triangles are full GC's which are triggered manually 
> by using "jcmd pid GC.run" command.
> I also did more experiments in the second and third comment I posted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to