I'm trying to understand why it's not working and I typed some println to check what the code was executing..
def ruleSqlInjection(lines: ReceiverInputDStream[String]) = { println("1"); //********************Just one time, when I start the program val filterSql = lines.filter(line => line.contains("SQL")) val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(), classOf[Akamai])) val groupSql = jsonSql.map { json => val srcIp = json.getMessage().getCliIP() val srcURL = json.getMessage().getReqHost() (srcIp + "_" + srcURL, json) } println("2"); //********************Just one time, when I start the program val errorLinesValueReduce = groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15)) println("3"); //********************Just one time, when I start the program errorLinesValueReduce.foreachRDD { rdd => rdd.foreach { elem1 => println("4 " + elem1); //********************All time if (elem1._2.size > 0) { println("do something") } } } println("fin foreachRdd"); ///********************Just one time, when I start the program Why it's just executing the println("4...")?? shouldn't it execute all the code each 15 seconds that it's what it's defined on the context (val ssc = new StreamingContext(sparkConf, Seconds(15));) 2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>: > I'm trying to make some operation with windows and intervals. > > I get data every15 seconds, and want to have a windows of 60 seconds > with batch intervals of 15 seconds. > I''m injecting data with ncat. if I inject 3 logs in the same interval > I get into the "do something" each 15 secods during one minute, > I understand that I get into "do something" the first interval but the > logs shouldn't appear in the next interval, Why do I get into "do > something" in all the intervals for a minute? What am I doing wrong? > > > val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp"); > val ssc = new StreamingContext(sparkConf, Seconds(15)); > val lines = ssc.socketTextStream("localhost", sparkPort.toInt); > ssc.checkpoint(sparkCheckPoint) > > ruleSqlInjection(lines) > > ssc.start() > ssc.awaitTermination() > > > > def ruleSqlInjection(lines: ReceiverInputDStream[String]) = { > val filterSql = lines.filter(line => line.contains("SQL")) > val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(), > classOf[Model])) > val groupSql = jsonSql.map { > json => > val srcIp = json.getMessage().getCliIP() > val srcURL = json.getMessage().getReqHost() > (srcIp + "_" + srcURL, json) > } > val errorLinesValueReduce = > groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15)) > errorLinesValueReduce.foreachRDD { > rdd => > val elem1 = rdd.take(1) > println("take1 ->" + elem1(0)._1) > println("take2 ->" + elem1(0)._2) //it's always getting the > logs for the first 15 seconds during one minute.. > > if (elem1.size > 0) { > val alerts = elem1(0)._2 > if (alerts.size > 2) { > println("do something") // I don't undestand why it's > getting into here 4 intervals > } > } > } > } --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org