I'm trying to make some operation with windows and intervals. I get data every15 seconds, and want to have a windows of 60 seconds with batch intervals of 15 seconds. I''m injecting data with ncat. if I inject 3 logs in the same interval I get into the "do something" each 15 secods during one minute, I understand that I get into "do something" the first interval but the logs shouldn't appear in the next interval, Why do I get into "do something" in all the intervals for a minute? What am I doing wrong?
val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp"); val ssc = new StreamingContext(sparkConf, Seconds(15)); val lines = ssc.socketTextStream("localhost", sparkPort.toInt); ssc.checkpoint(sparkCheckPoint) ruleSqlInjection(lines) ssc.start() ssc.awaitTermination() def ruleSqlInjection(lines: ReceiverInputDStream[String]) = { val filterSql = lines.filter(line => line.contains("SQL")) val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(), classOf[Model])) val groupSql = jsonSql.map { json => val srcIp = json.getMessage().getCliIP() val srcURL = json.getMessage().getReqHost() (srcIp + "_" + srcURL, json) } val errorLinesValueReduce = groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15)) errorLinesValueReduce.foreachRDD { rdd => val elem1 = rdd.take(1) println("take1 ->" + elem1(0)._1) println("take2 ->" + elem1(0)._2) //it's always getting the logs for the first 15 seconds during one minute.. if (elem1.size > 0) { val alerts = elem1(0)._2 if (alerts.size > 2) { println("do something") // I don't undestand why it's getting into here 4 intervals } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org