Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated. 

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR 

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--------------------------------------

object NetflowTopn {
   
  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" ->     3 )
  hm += ( "dpkts" ->      4 )
  hm += ( "doctets" ->    5 )
  hm += ( "first" ->      6 )
  hm += ( "last" ->       7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->    10 )
  hm += ( "dstaddr" ->    11 )
  hm += ( "nexthop" ->    12 )
  hm += ( "input" ->      13 )
  hm += ( "output" ->     14 )
  hm += ( "srcport" ->    15 )
  hm += ( "dstport" ->    16 )
  hm += ( "prot" ->       17 )
  hm += ( "tos" ->        18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" ->     22 )
  hm += ( "dst_as" ->     23 )
    
  def getKey (lcamps: Array[String], camp: String): String  = {
    if (camp == "total") return "total"
    else return lcamps(hm(camp))
  }

  def getVal (lcamps: Array[String], camp: String): Long  = {
    if (camp == "flows") return 1L
    else return lcamps(hm(camp)).toLong
  }
  
  def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
    val arr = line.split(",")
    (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
  }

  def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {
  
           val ts = System.currentTimeMillis
           val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
           val f1f = new File(f1);
           val ftmpf = new File(f1 + ts);
           val pw = new PrintWriter(ftmpf)
           pw.println(csvheader)
           data.foreach{
                t =>  pw.println (t._2 + "," + t._1) 
           }
           pw.close
           ftmpf.renameTo(f1f);
       
  }

    
  def main(args: Array[String]) {
    
    if (args.length < 1) {
      System.err.println("Usage: NetflowTopn <apppath>")
      System.exit(1)
    }
    
    appPath = args(0)
    
    try {
       val prop = new Properties()
       prop.load(new FileInputStream(appPath + "/conf/app.properties"))
            
       zkQuorum =        prop.getProperty("KAFKA_HOST")
       group =           prop.getProperty("KAFKA_GROUP")
       topics =          prop.getProperty("KAFKA_TOPIC")
       numThreads =      prop.getProperty("THREADS").toInt
           
    } catch { case e: Exception => 
       e.printStackTrace()
       sys.exit(1)
    }
      
    val sparkConf = new SparkConf().setAppName("netflow-topn")
                                   .set("spark.default.parallelism", "2")
                                   .set("spark.rdd.compress", "true")
                                   .set("spark.streaming.unpersist", "true")
                                   .set("spark.cleaner.ttl", "300")
                                   
    val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))
    
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
   
    val kafpar = Map[String, String](
      "zookeeper.connect" -> zkQuorum,
      "group.id" -> group,
      "zookeeper.connection.timeout.ms" -> "5000",
      "auto.commit.interval.ms" -> "60000",
      "auto.offset.reset" -> "largest"
    )

    val lines = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder] (ssc, kafpar, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()

   val ll_keycamps = List ( List("srcaddr", "dstaddr")
                            ,List("dstaddr")
                            ,List("srcaddr")
                            ,List("srcport")
                            ,List("dstport")
                            ,List("total")
                          )

   val l_valcamps = List ("doctets"
                         ,"dpkts"
                         ,"flows"
                          )

   for (keycamps <- ll_keycamps) {

         val keycamps_str = keycamps.mkString("-")
         val csvheader = keycamps.mkString(",") + ",amount"

         for (valcamp <- l_valcamps) { 

              val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
).cache()

              lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60),
Seconds(10))
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20) 
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATAWINDOW")
                     })


              lines2.reduceByKey((a:Long,b:Long)=>a+b)
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20) 
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATA")
                     })

         }
    }
                           
    ssc.start()
    ssc.awaitTermination()
    
  }
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to