Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/199935

Change subject: [WIP] Add Apps session metrics job
......................................................................

[WIP] Add Apps session metrics job

Bug: T86535
Change-Id: Ibd9bc45179ace3f74fe1bf1ac2f42de7867db90e
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala
1 file changed, 101 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/35/199935/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala
new file mode 100644
index 0000000..82474ae
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala
@@ -0,0 +1,101 @@
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.wikimedia.analytics.refinery.core.PageviewDefinition
+import org.wikimedia.analytics.refinery.core.Webrequest
+import scala.math.pow
+import org.apache.spark.rdd.RDD
+
+
+// helper methods
+val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
+def ms (s:String) : Long = { dateFormat.parse(s).getTime() }
+def s (a:Any) : String = { a.asInstanceOf[String] }
+
+
+// statistical methods
+def geoMean (nums:RDD[Long]) : Double = {
+    pow(nums.fold(1)(_ * _), 1.0 / nums.count)
+}
+def printStats (nums:RDD[Long]) = println((
+    nums.count,
+    geoMean(nums),
+    nums.mean,
+    nums.min,
+    nums.max
+))
+
+
+/**
+ * Empty list of sessions
+ * To be used as zero value for the sessionize function.
+ */
+val emptySessions = List[List[Long]]()
+
+
+/**
+ * Session logic
+ *
+ * @param sessions  List of sessions. Each session is represented
+ *                  as an ordered list of pageview timestamps.
+ * @param timestamp The pageview timestamp to be merged to the
+ *                  session list. It is assumed to be greater than
+ *                  all the previous timestamps in the session list.
+ *
+ * @return The list of sessions including the new pageview timestamp.
+ *         Depending on the time passed since last pageview,
+ *         The timestamp will be allocated as part of the last session
+ *         or in a new session.
+ */
+def sessionize (sessions:List[List[Long]], timestamp:Long) : List[List[Long]] 
= {
+    if (sessions.length == 0) List(List(timestamp))
+    else {
+        if (timestamp <= sessions.last.last + 1800000) {
+            sessions.init :+ (sessions.last :+ timestamp)
+        } else sessions :+ List(timestamp)
+    }
+}
+
+
+// setup sql engine
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+sqlContext.parquetFile(
+    
"/wmf/data/wmf/webrequest/webrequest_source=mobile/year=2015/month=3/day=20/hour=0/000000_0"
+).registerTempTable("webrequest")
+
+
+// compute sessions by user
+val userSessions = sqlContext.
+    // get webrequest data
+    sql("""SELECT uri_path, uri_query, content_type, user_agent, x_analytics, 
dt
+        FROM webrequest WHERE is_pageview = TRUE""").
+    // filter app pageviews
+    filter(r => PageviewDefinition.getInstance.isAppPageview(s(r(0)), s(r(1)), 
s(r(2)), s(r(3)))).
+    // map: pageview -> (uuid, timestamp)
+    map(pv => (Webrequest.getInstance.getXAnalyticsValue(s(pv(4)), "uuid"), 
ms(s(pv(5))))).
+    // aggregate: (uuid, timestamp)* -> (uuid, List(ts1, ts2, ts3, ...))
+    combineByKey(
+        List(_),
+        (l:List[Long], t:Long) => l :+ t,
+        (l1:List[Long], l2:List[Long]) => l1 ::: l2
+    ).
+    // sample sessions to 10%
+    sample(false, 0.1).
+    // map: (uuid, List(ts1, ts2, ts3, ...)) -> (uuid, List(List(ts1, ts2), 
List(ts3), ...)
+    map(p => (p._1, p._2.sorted.foldLeft(emptySessions)(sessionize)))
+
+
+// flatten: (uuid, List(session1, session2, ...) -> session*
+val sessions = userSessions.flatMap(_._2)
+
+
+// metrics
+val sessionsPerUser = userSessions.map(r => { r._2.length.asInstanceOf[Long] })
+val pageviewsPerSession = sessions.map(r => { r.length.asInstanceOf[Long] })
+val sessionLength = sessions.filter(r => { r.length > 1 }).map(r => { r.last - 
r(0) })
+
+
+// output
+printStats(sessionsPerUser)
+printStats(pageviewsPerSession)
+printStats(sessionLength)

-- 
To view, visit https://gerrit.wikimedia.org/r/199935
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibd9bc45179ace3f74fe1bf1ac2f42de7867db90e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to