I'm trying to make some operation with windows and intervals.

I get data every15 seconds, and want to have a windows of 60 seconds
with  batch intervals of 15 seconds.
I''m injecting data with ncat. if I inject 3 logs in the same interval
I get into the "do something" each 15 secods during one minute,
I understand that I get into "do something" the first interval but the
logs shouldn't appear in the next interval, Why do I get into "do
something" in all the intervals for a minute? What am I doing wrong?


    val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
    val ssc = new StreamingContext(sparkConf, Seconds(15));
    val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
    ssc.checkpoint(sparkCheckPoint)

    ruleSqlInjection(lines)

    ssc.start()
    ssc.awaitTermination()



  def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
    val filterSql = lines.filter(line => line.contains("SQL"))
    val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Model]))
    val groupSql = jsonSql.map {
      json =>
        val srcIp = json.getMessage().getCliIP()
        val srcURL = json.getMessage().getReqHost()
        (srcIp + "_" + srcURL, json)
    }
    val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
    errorLinesValueReduce.foreachRDD {
      rdd =>
        val elem1 = rdd.take(1)
        println("take1 ->" + elem1(0)._1)
        println("take2 ->" + elem1(0)._2)  //it's always getting the
logs for the first 15 seconds during one minute..

        if (elem1.size > 0) {
          val alerts = elem1(0)._2
          if (alerts.size > 2) {
            println("do something")  // I don't undestand why it's
getting into here 4 intervals
          }
        }
    }
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to