Hi, I'm using Spark Streaming 1.0.
Say I have a source of website click stream, like the following: ('2014-09-19 00:00:00', '192.168.1.1', 'home_page') ('2014-09-19 00:00:01', '192.168.1.2', 'list_page') ... And I want to calculate the page views (PV, number of logs) and unique user (UV, identified by IP) every minute, the result is like: ('2014-09-19 00:00:00', 'pv', 100) ('2014-09-19 00:00:00', 'uv', 50) ('2014-09-19 00:01:00', 'pv', 120) ('2014-09-19 00:01:00', 'uv', 60) Also, the total pv/uv by minute, like: ('2014-09-19 00:00:00', 'total pv', 100) ('2014-09-19 00:00:00', 'total uv', 50) ('2014-09-19 00:01:00', 'total pv', 220) // 100 + 120 ('2014-09-19 00:01:00', 'total uv', 80) // 50 unique users + 60 unique users - duplicate ones There are also some prerequisite: * The stream may not be fluent, so at 12:00 you may still receiving 11:55's messages. Put it in another way, this program should support both stream process and batch process, i.e. feed it with a whole day's data, it'll output the same result as streaming one. * The stream is partitioned, i.e. it may not be ordered. e.g. you may receive 12:00:00, 12:00:05, 12:00:04, 12:00:06, but the time difference shouldn't be too big. * The final result will be written to mysql, schema is (created datetime, category varchar(255), data bigint), just like the above result. For the 'every minute' one, I can use updateStateByKey, here's an example of calculating pv: (batch duration is 2 secs) val logsByMin = logs map { log => val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:00").format(log.serverTime) date -> 1L } val numLogsByMin = logsByMin.updateStateByKey((values: Seq[Long], state: Option[Long]) => { Some(state.getOrElse(0L) + values.sum) }) numLogsByMin foreach { rdd => savePv(rdd.collect) } This should meet the prerequisites, but with one major problem: the outdated key is not evicted. So I come up with an idea of expirable data - retain the calculated data for 2 minutes. Within the 2 minutes, flush them into mysql after every batch. The code is here: https://github.com/jizhang/spark-hello/blob/eb138e24b1e72e89bf3fa7e66c6ae7106853e5e8/src/main/scala/com/anjuke/dw/spark_hello/ActionLogProcessor.scala#L80 Maybe there' a better way to achieve this? As for the total pv/uv, I can set the key to date (e.g. '2014-09-19'), but how to save it to mysql every minute? Especially for uv, it cannot be summed, so I need to save it every minute, but how? Any ideas will be appreciated. Thanks. Jerry --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org