Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms:
- Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-------------------------------------- object NetflowTopn { var appPath = "." var zkQuorum = "" var group = "" var topics = "" var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( "unix_secs" -> 0 ) hm += ( "unix_nsecs" -> 1 ) hm += ( "sysuptime" -> 2 ) hm += ( "exaddr" -> 3 ) hm += ( "dpkts" -> 4 ) hm += ( "doctets" -> 5 ) hm += ( "first" -> 6 ) hm += ( "last" -> 7 ) hm += ( "engine_type" -> 8 ) hm += ( "engine_id" -> 9 ) hm += ( "srcaddr" -> 10 ) hm += ( "dstaddr" -> 11 ) hm += ( "nexthop" -> 12 ) hm += ( "input" -> 13 ) hm += ( "output" -> 14 ) hm += ( "srcport" -> 15 ) hm += ( "dstport" -> 16 ) hm += ( "prot" -> 17 ) hm += ( "tos" -> 18 ) hm += ( "tcp_flags" -> 19 ) hm += ( "src_mask" -> 20 ) hm += ( "dst_mask" -> 21 ) hm += ( "src_as" -> 22 ) hm += ( "dst_as" -> 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == "total") return "total" else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == "flows") return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(",") (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" + valcamp + ".csv" val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t => pw.println (t._2 + "," + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: NetflowTopn <apppath>") System.exit(1) } appPath = args(0) try { val prop = new Properties() prop.load(new FileInputStream(appPath + "/conf/app.properties")) zkQuorum = prop.getProperty("KAFKA_HOST") group = prop.getProperty("KAFKA_GROUP") topics = prop.getProperty("KAFKA_TOPIC") numThreads = prop.getProperty("THREADS").toInt } catch { case e: Exception => e.printStackTrace() sys.exit(1) } val sparkConf = new SparkConf().setAppName("netflow-topn") .set("spark.default.parallelism", "2") .set("spark.rdd.compress", "true") .set("spark.streaming.unpersist", "true") .set("spark.cleaner.ttl", "300") val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val kafpar = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "5000", "auto.commit.interval.ms" -> "60000", "auto.offset.reset" -> "largest" ) val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder] (ssc, kafpar, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() val ll_keycamps = List ( List("srcaddr", "dstaddr") ,List("dstaddr") ,List("srcaddr") ,List("srcport") ,List("dstport") ,List("total") ) val l_valcamps = List ("doctets" ,"dpkts" ,"flows" ) for (keycamps <- ll_keycamps) { val keycamps_str = keycamps.mkString("-") val csvheader = keycamps.mkString(",") + ",amount" for (valcamp <- l_valcamps) { val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) ).cache() lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60), Seconds(10)) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATAWINDOW") }) lines2.reduceByKey((a:Long,b:Long)=>a+b) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATA") }) } } ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org