I used to hit this issue when my processing time exceeds the batch
duration. Here's a few workarounds:

- Use storage level MEMORY_AND_DISK
- Enable WAL and check pointing

Above two will slow down things a little bit.

If you want low latency, what you can try is:

- Use storage level as MEMORY_ONLY_2 ( Atleast replicates it)

- Tachyon based off heap for storage (havent tried this, but will let you
know)

And from spark 1.3.1 version, they have purged the old WAL and it has
better performance. You could try that also.
On 16 Apr 2015 14:10, "Miquel" <miquel.gonza...@tecsidel.es> wrote:

> Hello,
> I'm finding problems to run a spark streaming job for more than a few hours
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
> symptoms:
>
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
> error is being thrown (java.lang.Exception: Could not compute split, block
> input-0-1429168311800 not found) and data stops being calculated.
>
> - The delay showed in web UI keeps also increasing.
>
> - After some hours disk space is being consumed. There are a lot of
> directories with name like
> "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>
> The job is basically reading information from kafka topic, and calculate
> several topN tables for some key and value camps related with netflow data,
> some of the parameters are this:
> - batch interval: 10 seconds
> - window calculation: 1 minute
> - spark.cleaner.ttl: 5 minutes
>
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
> options to run it is as follows:
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR
>
> someone has some clues about the problem? I don't know if it is a
> configuration problem or some error in the code that is causing memory
> leaks..
>
> Thank you in advance!
> Miquel
>
> PD: the code is basically this:--------------------------------------
>
> object NetflowTopn {
>
>   var appPath = "."
>   var zkQuorum = ""
>   var group = ""
>   var topics = ""
>   var numThreads = 1
>
>   var batch_interval = 10
>   var n_window = 1
>   var n_slide = 1
>   var topnsize = 10
>
>   var hm = Map[String,Int]()
>   hm += ( "unix_secs" ->  0 )
>   hm += ( "unix_nsecs" -> 1 )
>   hm += ( "sysuptime" ->  2 )
>   hm += ( "exaddr" ->     3 )
>   hm += ( "dpkts" ->      4 )
>   hm += ( "doctets" ->    5 )
>   hm += ( "first" ->      6 )
>   hm += ( "last" ->       7 )
>   hm += ( "engine_type" -> 8 )
>   hm += ( "engine_id" ->   9 )
>   hm += ( "srcaddr" ->    10 )
>   hm += ( "dstaddr" ->    11 )
>   hm += ( "nexthop" ->    12 )
>   hm += ( "input" ->      13 )
>   hm += ( "output" ->     14 )
>   hm += ( "srcport" ->    15 )
>   hm += ( "dstport" ->    16 )
>   hm += ( "prot" ->       17 )
>   hm += ( "tos" ->        18 )
>   hm += ( "tcp_flags" ->  19 )
>   hm += ( "src_mask" ->   20 )
>   hm += ( "dst_mask" ->   21 )
>   hm += ( "src_as" ->     22 )
>   hm += ( "dst_as" ->     23 )
>
>   def getKey (lcamps: Array[String], camp: String): String  = {
>     if (camp == "total") return "total"
>     else return lcamps(hm(camp))
>   }
>
>   def getVal (lcamps: Array[String], camp: String): Long  = {
>     if (camp == "flows") return 1L
>     else return lcamps(hm(camp)).toLong
>   }
>
>   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) =
> {
>     val arr = line.split(",")
>     (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
>   }
>
>   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
> csvheader: String, valcamp: String, prefix: String) = {
>
>            val ts = System.currentTimeMillis
>            val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_"
> +
> valcamp + ".csv"
>            val f1f = new File(f1);
>            val ftmpf = new File(f1 + ts);
>            val pw = new PrintWriter(ftmpf)
>            pw.println(csvheader)
>            data.foreach{
>                 t =>  pw.println (t._2 + "," + t._1)
>            }
>            pw.close
>            ftmpf.renameTo(f1f);
>
>   }
>
>
>   def main(args: Array[String]) {
>
>     if (args.length < 1) {
>       System.err.println("Usage: NetflowTopn <apppath>")
>       System.exit(1)
>     }
>
>     appPath = args(0)
>
>     try {
>        val prop = new Properties()
>        prop.load(new FileInputStream(appPath + "/conf/app.properties"))
>
>        zkQuorum =        prop.getProperty("KAFKA_HOST")
>        group =           prop.getProperty("KAFKA_GROUP")
>        topics =          prop.getProperty("KAFKA_TOPIC")
>        numThreads =      prop.getProperty("THREADS").toInt
>
>     } catch { case e: Exception =>
>        e.printStackTrace()
>        sys.exit(1)
>     }
>
>     val sparkConf = new SparkConf().setAppName("netflow-topn")
>                                    .set("spark.default.parallelism", "2")
>                                    .set("spark.rdd.compress", "true")
>                                    .set("spark.streaming.unpersist",
> "true")
>                                    .set("spark.cleaner.ttl", "300")
>
>     val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))
>
>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>
>     val kafpar = Map[String, String](
>       "zookeeper.connect" -> zkQuorum,
>       "group.id" -> group,
>       "zookeeper.connection.timeout.ms" -> "5000",
>       "auto.commit.interval.ms" -> "60000",
>       "auto.offset.reset" -> "largest"
>     )
>
>     val lines = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder] (ssc, kafpar, topicMap,
> StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()
>
>    val ll_keycamps = List ( List("srcaddr", "dstaddr")
>                             ,List("dstaddr")
>                             ,List("srcaddr")
>                             ,List("srcport")
>                             ,List("dstport")
>                             ,List("total")
>                           )
>
>    val l_valcamps = List ("doctets"
>                          ,"dpkts"
>                          ,"flows"
>                           )
>
>    for (keycamps <- ll_keycamps) {
>
>          val keycamps_str = keycamps.mkString("-")
>          val csvheader = keycamps.mkString(",") + ",amount"
>
>          for (valcamp <- l_valcamps) {
>
>               val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
> ).cache()
>
>               lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b,
> Seconds(60),
> Seconds(10))
>                       .map(_.swap)
>                       .transform(_.sortByKey(false))
>                       .foreachRDD(rdd => {
>                              val data = rdd.take(20)
>                              writeOutput (data, keycamps_str, csvheader,
> valcamp, "DATAWINDOW")
>                      })
>
>
>               lines2.reduceByKey((a:Long,b:Long)=>a+b)
>                       .map(_.swap)
>                       .transform(_.sortByKey(false))
>                       .foreachRDD(rdd => {
>                              val data = rdd.take(20)
>                              writeOutput (data, keycamps_str, csvheader,
> valcamp, "DATA")
>                      })
>
>          }
>     }
>
>     ssc.start()
>     ssc.awaitTermination()
>
>   }
> }
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to