Hi Akhil, Thank you for your response, I think it is not because of the processing time, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second)
By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel De: Akhil Das [mailto:ak...@sigmoidanalytics.com] Enviado el: jueves, 16 de abril de 2015 12:07 Para: González Salgado, Miquel CC: user@spark.apache.org Asunto: Re: Streaming problems running 24x7 I used to hit this issue when my processing time exceeds the batch duration. Here's a few workarounds: - Use storage level MEMORY_AND_DISK - Enable WAL and check pointing Above two will slow down things a little bit. If you want low latency, what you can try is: - Use storage level as MEMORY_ONLY_2 ( Atleast replicates it) - Tachyon based off heap for storage (havent tried this, but will let you know) And from spark 1.3.1 version, they have purged the old WAL and it has better performance. You could try that also. On 16 Apr 2015 14:10, "Miquel" <miquel.gonza...@tecsidel.es<mailto:miquel.gonza...@tecsidel.es>> wrote: Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-------------------------------------- object NetflowTopn { var appPath = "." var zkQuorum = "" var group = "" var topics = "" var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( "unix_secs" -> 0 ) hm += ( "unix_nsecs" -> 1 ) hm += ( "sysuptime" -> 2 ) hm += ( "exaddr" -> 3 ) hm += ( "dpkts" -> 4 ) hm += ( "doctets" -> 5 ) hm += ( "first" -> 6 ) hm += ( "last" -> 7 ) hm += ( "engine_type" -> 8 ) hm += ( "engine_id" -> 9 ) hm += ( "srcaddr" -> 10 ) hm += ( "dstaddr" -> 11 ) hm += ( "nexthop" -> 12 ) hm += ( "input" -> 13 ) hm += ( "output" -> 14 ) hm += ( "srcport" -> 15 ) hm += ( "dstport" -> 16 ) hm += ( "prot" -> 17 ) hm += ( "tos" -> 18 ) hm += ( "tcp_flags" -> 19 ) hm += ( "src_mask" -> 20 ) hm += ( "dst_mask" -> 21 ) hm += ( "src_as" -> 22 ) hm += ( "dst_as" -> 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == "total") return "total" else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == "flows") return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(",") (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" + valcamp + ".csv" val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t => pw.println (t._2 + "," + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: NetflowTopn <apppath>") System.exit(1) } appPath = args(0) try { val prop = new Properties() prop.load(new FileInputStream(appPath + "/conf/app.properties")) zkQuorum = prop.getProperty("KAFKA_HOST") group = prop.getProperty("KAFKA_GROUP") topics = prop.getProperty("KAFKA_TOPIC") numThreads = prop.getProperty("THREADS").toInt } catch { case e: Exception => e.printStackTrace() sys.exit(1) } val sparkConf = new SparkConf().setAppName("netflow-topn") .set("spark.default.parallelism", "2") .set("spark.rdd.compress", "true") .set("spark.streaming.unpersist", "true") .set("spark.cleaner.ttl", "300") val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val kafpar = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id<http://group.id>" -> group, "zookeeper.connection.timeout.ms<http://zookeeper.connection.timeout.ms>" -> "5000", "auto.commit.interval.ms<http://auto.commit.interval.ms>" -> "60000", "auto.offset.reset" -> "largest" ) val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder] (ssc, kafpar, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() val ll_keycamps = List ( List("srcaddr", "dstaddr") ,List("dstaddr") ,List("srcaddr") ,List("srcport") ,List("dstport") ,List("total") ) val l_valcamps = List ("doctets" ,"dpkts" ,"flows" ) for (keycamps <- ll_keycamps) { val keycamps_str = keycamps.mkString("-") val csvheader = keycamps.mkString(",") + ",amount" for (valcamp <- l_valcamps) { val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) ).cache() lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60), Seconds(10)) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATAWINDOW") }) lines2.reduceByKey((a:Long,b:Long)=>a+b) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATA") }) } } ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>