Hi Akhil,

Thank you for your response,
I think it is not because of the processing time, in fact the delay is under 1 
second, while the batch interval is 10 seconds… The data volume is low (10 
lines / second)

By the way, I have seen some results changing to this call of Kafkautils:
KafkaUtils.createDirectStream

CPU usage is low and stable, but memory is slowly increasing… But at least the 
process last longer..

Best regards,
Miquel


De: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Enviado el: jueves, 16 de abril de 2015 12:07
Para: González Salgado, Miquel
CC: user@spark.apache.org
Asunto: Re: Streaming problems running 24x7


I used to hit this issue when my processing time exceeds the batch duration. 
Here's a few workarounds:

- Use storage level MEMORY_AND_DISK
- Enable WAL and check pointing

Above two will slow down things a little bit.

If you want low latency, what you can try is:

- Use storage level as MEMORY_ONLY_2 ( Atleast replicates it)

- Tachyon based off heap for storage (havent tried this, but will let you know)

And from spark 1.3.1 version, they have purged the old WAL and it has better 
performance. You could try that also.
On 16 Apr 2015 14:10, "Miquel" 
<miquel.gonza...@tecsidel.es<mailto:miquel.gonza...@tecsidel.es>> wrote:
Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--------------------------------------

object NetflowTopn {

  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" ->     3 )
  hm += ( "dpkts" ->      4 )
  hm += ( "doctets" ->    5 )
  hm += ( "first" ->      6 )
  hm += ( "last" ->       7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->    10 )
  hm += ( "dstaddr" ->    11 )
  hm += ( "nexthop" ->    12 )
  hm += ( "input" ->      13 )
  hm += ( "output" ->     14 )
  hm += ( "srcport" ->    15 )
  hm += ( "dstport" ->    16 )
  hm += ( "prot" ->       17 )
  hm += ( "tos" ->        18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" ->     22 )
  hm += ( "dst_as" ->     23 )

  def getKey (lcamps: Array[String], camp: String): String  = {
    if (camp == "total") return "total"
    else return lcamps(hm(camp))
  }

  def getVal (lcamps: Array[String], camp: String): Long  = {
    if (camp == "flows") return 1L
    else return lcamps(hm(camp)).toLong
  }

  def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
    val arr = line.split(",")
    (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
  }

  def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {

           val ts = System.currentTimeMillis
           val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
           val f1f = new File(f1);
           val ftmpf = new File(f1 + ts);
           val pw = new PrintWriter(ftmpf)
           pw.println(csvheader)
           data.foreach{
                t =>  pw.println (t._2 + "," + t._1)
           }
           pw.close
           ftmpf.renameTo(f1f);

  }


  def main(args: Array[String]) {

    if (args.length < 1) {
      System.err.println("Usage: NetflowTopn <apppath>")
      System.exit(1)
    }

    appPath = args(0)

    try {
       val prop = new Properties()
       prop.load(new FileInputStream(appPath + "/conf/app.properties"))

       zkQuorum =        prop.getProperty("KAFKA_HOST")
       group =           prop.getProperty("KAFKA_GROUP")
       topics =          prop.getProperty("KAFKA_TOPIC")
       numThreads =      prop.getProperty("THREADS").toInt

    } catch { case e: Exception =>
       e.printStackTrace()
       sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("netflow-topn")
                                   .set("spark.default.parallelism", "2")
                                   .set("spark.rdd.compress", "true")
                                   .set("spark.streaming.unpersist", "true")
                                   .set("spark.cleaner.ttl", "300")

    val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val kafpar = Map[String, String](
      "zookeeper.connect" -> zkQuorum,
      "group.id<http://group.id>" -> group,
      "zookeeper.connection.timeout.ms<http://zookeeper.connection.timeout.ms>" 
-> "5000",
      "auto.commit.interval.ms<http://auto.commit.interval.ms>" -> "60000",
      "auto.offset.reset" -> "largest"
    )

    val lines = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder] (ssc, kafpar, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()

   val ll_keycamps = List ( List("srcaddr", "dstaddr")
                            ,List("dstaddr")
                            ,List("srcaddr")
                            ,List("srcport")
                            ,List("dstport")
                            ,List("total")
                          )

   val l_valcamps = List ("doctets"
                         ,"dpkts"
                         ,"flows"
                          )

   for (keycamps <- ll_keycamps) {

         val keycamps_str = keycamps.mkString("-")
         val csvheader = keycamps.mkString(",") + ",amount"

         for (valcamp <- l_valcamps) {

              val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
).cache()

              lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60),
Seconds(10))
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20)
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATAWINDOW")
                     })


              lines2.reduceByKey((a:Long,b:Long)=>a+b)
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20)
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATA")
                     })

         }
    }

    ssc.start()
    ssc.awaitTermination()

  }
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>

Reply via email to