I'm trying to understand why it's not working and I typed some println
to check what the code was executing..

  def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
    println("1"); //********************Just one time, when I start the program
    val filterSql = lines.filter(line => line.contains("SQL"))
    val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Akamai]))
    val groupSql = jsonSql.map {
      json =>
        val srcIp = json.getMessage().getCliIP()
        val srcURL = json.getMessage().getReqHost()
        (srcIp + "_" + srcURL, json)
    }
    println("2"); //********************Just one time, when I start the program

    val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))

    println("3"); //********************Just one time, when I start the program
    errorLinesValueReduce.foreachRDD {
      rdd =>
        rdd.foreach { elem1 =>

          println("4 " + elem1); //********************All time
          if (elem1._2.size > 0) {
            println("do something")
          }
        }
    }
    println("fin foreachRdd");  ///********************Just one time,
when I start the program


Why it's just executing the println("4...")?? shouldn't it execute all
the code each 15 seconds that it's what it's defined on the context
(val ssc = new StreamingContext(sparkConf, Seconds(15));)

2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:
> I'm trying to make some operation with windows and intervals.
>
> I get data every15 seconds, and want to have a windows of 60 seconds
> with  batch intervals of 15 seconds.
> I''m injecting data with ncat. if I inject 3 logs in the same interval
> I get into the "do something" each 15 secods during one minute,
> I understand that I get into "do something" the first interval but the
> logs shouldn't appear in the next interval, Why do I get into "do
> something" in all the intervals for a minute? What am I doing wrong?
>
>
>     val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
>     val ssc = new StreamingContext(sparkConf, Seconds(15));
>     val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
>     ssc.checkpoint(sparkCheckPoint)
>
>     ruleSqlInjection(lines)
>
>     ssc.start()
>     ssc.awaitTermination()
>
>
>
>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>     val filterSql = lines.filter(line => line.contains("SQL"))
>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Model]))
>     val groupSql = jsonSql.map {
>       json =>
>         val srcIp = json.getMessage().getCliIP()
>         val srcURL = json.getMessage().getReqHost()
>         (srcIp + "_" + srcURL, json)
>     }
>     val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>     errorLinesValueReduce.foreachRDD {
>       rdd =>
>         val elem1 = rdd.take(1)
>         println("take1 ->" + elem1(0)._1)
>         println("take2 ->" + elem1(0)._2)  //it's always getting the
> logs for the first 15 seconds during one minute..
>
>         if (elem1.size > 0) {
>           val alerts = elem1(0)._2
>           if (alerts.size > 2) {
>             println("do something")  // I don't undestand why it's
> getting into here 4 intervals
>           }
>         }
>     }
>   }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to