Mforns has uploaded a new change for review. https://gerrit.wikimedia.org/r/199935
Change subject: [WIP] Add Apps session metrics job ...................................................................... [WIP] Add Apps session metrics job Bug: T86535 Change-Id: Ibd9bc45179ace3f74fe1bf1ac2f42de7867db90e --- A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala 1 file changed, 101 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/35/199935/1 diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala new file mode 100644 index 0000000..82474ae --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/AppSessionMetrics.scala @@ -0,0 +1,101 @@ + +import java.util.Date +import java.text.SimpleDateFormat +import org.wikimedia.analytics.refinery.core.PageviewDefinition +import org.wikimedia.analytics.refinery.core.Webrequest +import scala.math.pow +import org.apache.spark.rdd.RDD + + +// helper methods +val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") +def ms (s:String) : Long = { dateFormat.parse(s).getTime() } +def s (a:Any) : String = { a.asInstanceOf[String] } + + +// statistical methods +def geoMean (nums:RDD[Long]) : Double = { + pow(nums.fold(1)(_ * _), 1.0 / nums.count) +} +def printStats (nums:RDD[Long]) = println(( + nums.count, + geoMean(nums), + nums.mean, + nums.min, + nums.max +)) + + +/** + * Empty list of sessions + * To be used as zero value for the sessionize function. + */ +val emptySessions = List[List[Long]]() + + +/** + * Session logic + * + * @param sessions List of sessions. Each session is represented + * as an ordered list of pageview timestamps. + * @param timestamp The pageview timestamp to be merged to the + * session list. It is assumed to be greater than + * all the previous timestamps in the session list. + * + * @return The list of sessions including the new pageview timestamp. + * Depending on the time passed since last pageview, + * The timestamp will be allocated as part of the last session + * or in a new session. + */ +def sessionize (sessions:List[List[Long]], timestamp:Long) : List[List[Long]] = { + if (sessions.length == 0) List(List(timestamp)) + else { + if (timestamp <= sessions.last.last + 1800000) { + sessions.init :+ (sessions.last :+ timestamp) + } else sessions :+ List(timestamp) + } +} + + +// setup sql engine +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +sqlContext.parquetFile( + "/wmf/data/wmf/webrequest/webrequest_source=mobile/year=2015/month=3/day=20/hour=0/000000_0" +).registerTempTable("webrequest") + + +// compute sessions by user +val userSessions = sqlContext. + // get webrequest data + sql("""SELECT uri_path, uri_query, content_type, user_agent, x_analytics, dt + FROM webrequest WHERE is_pageview = TRUE"""). + // filter app pageviews + filter(r => PageviewDefinition.getInstance.isAppPageview(s(r(0)), s(r(1)), s(r(2)), s(r(3)))). + // map: pageview -> (uuid, timestamp) + map(pv => (Webrequest.getInstance.getXAnalyticsValue(s(pv(4)), "uuid"), ms(s(pv(5))))). + // aggregate: (uuid, timestamp)* -> (uuid, List(ts1, ts2, ts3, ...)) + combineByKey( + List(_), + (l:List[Long], t:Long) => l :+ t, + (l1:List[Long], l2:List[Long]) => l1 ::: l2 + ). + // sample sessions to 10% + sample(false, 0.1). + // map: (uuid, List(ts1, ts2, ts3, ...)) -> (uuid, List(List(ts1, ts2), List(ts3), ...) + map(p => (p._1, p._2.sorted.foldLeft(emptySessions)(sessionize))) + + +// flatten: (uuid, List(session1, session2, ...) -> session* +val sessions = userSessions.flatMap(_._2) + + +// metrics +val sessionsPerUser = userSessions.map(r => { r._2.length.asInstanceOf[Long] }) +val pageviewsPerSession = sessions.map(r => { r.length.asInstanceOf[Long] }) +val sessionLength = sessions.filter(r => { r.length > 1 }).map(r => { r.last - r(0) }) + + +// output +printStats(sessionsPerUser) +printStats(pageviewsPerSession) +printStats(sessionLength) -- To view, visit https://gerrit.wikimedia.org/r/199935 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ibd9bc45179ace3f74fe1bf1ac2f42de7867db90e Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Mforns <mfo...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits