Shilad Sen has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/377706 )
Change subject: Spark job to create session event log appears to be working. ...................................................................... Spark job to create session event log appears to be working. Session event log is one session per line, with a list of page ids viewed in that session ordered by tstamp. simplewiki 2017-06-08 03:28:15.0 34242 9700 simplewiki 2017-06-08 03:14:49.0 31246 simplewiki 2017-06-08 03:14:22.0 34019 9084 81247 121 TODO: * Benchmark on larger datasets * Optimize if necessary (try broadcasting datastructure to avoid join) * Compare data cleaning with Elery's scripts Bug: T174796 Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da --- A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala 1 file changed, 547 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/06/377706/1 diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala new file mode 100644 index 0000000..d77a96a --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala @@ -0,0 +1,547 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import org.apache.log4j.{Level, LogManager} +import org.apache.spark.Partitioner + +import scala.collection.mutable.ArrayBuffer +import scala.util.hashing.MurmurHash3 + +/** + * Process to create a page view session log. + * + * This create output files with one line per session. + * The first two tokens of each line are the session start timestamp and language edition. + * The following tokens are the page ids viewed in the session, in order. + ** + * @author Shilad Sen + */ +object SessionPagesBuilder { + + + import org.apache.spark.sql.hive.HiveContext + import org.apache.spark.SparkConf + import org.apache.spark.sql.SaveMode + import org.apache.spark.SparkContext + import scopt.OptionParser + import org.apache.spark.rdd.RDD + import org.apache.spark.sql.SQLContext + + + case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long) + extends Ordered[SessionKey] { + override def compare(that: SessionKey): Int = { + import scala.math.Ordered.orderingToOrdered + + Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare( + (wikiDb, userHash, tstamp), + (that.wikiDb, that.userHash, that.tstamp) + ) + } + } + + + class SessionPartitioner(partitions: Int) extends Partitioner { + require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") + + override def numPartitions: Int = partitions + + override def getPartition(key: Any): Int = { + val k = key.asInstanceOf[SessionKey] + Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % numPartitions + } + } + + object SessionKey { + implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = { + Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp)) + } + } + + + + case class PageInfo( + wikiDb: String, + pageId: Long, + pageTitle: String, + pageNamespace: Long, + pageIsRedirect: Boolean, + effectivePageId : Long // page id or redirected destination page id + ) + + case class Redirect( + wikiDb: String, + fromPageId: Long, + toPageId: Long + ) + + case class PageView( + wikiDb: String, + userHash: Long, + tstamp: Long, + pageNamespace: Long, + pageId: Long + ) + + def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = { + (v1.wikiDb == v2.wikiDb) && + (v1.userHash == v2.userHash) && + (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis) + } + + + /** + * Given components associated with a distinct user , return a long hash + * @param client_ip + * @param user_agent + * @param x_forwarded_for + * @return + */ + def userHash( + client_ip: String, + user_agent: String, + x_forwarded_for: String + ) : Long = { + var s = client_ip + ":" + user_agent + ":" + x_forwarded_for + (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) | + (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL) + } + + /** + * Steps: + * - prepare raw tables: + * - page: Keep only wiki_db, page_id, page_namespace, page_is_redirect, + * page_title. Insert fake values used later + * - redirect Join with page (from and to) to clean and denormalize + */ + + + def listToSQLInCondition(list: Seq[String]): String = list.map(w => s"'$w'").mkString(", ") + + /** + * Prepare a map linking project hostnames to project dbnames. + * + * @return A map[hostname -> dbname] + */ + def prepareProjectToWikiMap( + sqlContext: SQLContext, + projectNamespaceTable: String, + snapshot: String, + wikiList: Seq[String] + ): Map[String, String] = { + sqlContext.sql( + s""" + |SELECT DISTINCT + | hostname, + | dbname + |FROM $projectNamespaceTable + |WHERE snapshot = '$snapshot' + | AND dbname IN (${listToSQLInCondition(wikiList)}) + """.stripMargin). + rdd. + collect. + map(r => r.getString(0) -> r.getString(1)).toMap + } + + /** + * Prepare the pages dataset to be reused + * + * @return A RDD of PageInfo data augmented with fake pages for each wiki + */ + def preparePages( + sqlContext: SQLContext, + pageTable: String, + snapshot: String, + wikiList: Seq[String] + ): RDD[PageInfo] = { + sqlContext.sql( + s""" + |SELECT + | wiki_db, + | page_id, + | page_title, + | page_namespace, + | page_is_redirect + |FROM $pageTable + |WHERE snapshot = '$snapshot' + | AND wiki_db IN (${listToSQLInCondition(wikiList)}) + | AND page_id IS NOT NULL + | AND page_namespace = 0 + | AND page_id > 0 + |GROUP BY + | wiki_db, + | page_id, + | page_title, + | page_namespace, + | page_is_redirect + """.stripMargin).rdd. + map(r => { + PageInfo(r.getString(0), r.getLong(1), r.getString(2), r.getLong(3), r.getBoolean(4), r.getLong(1)) + }). // insert rows for our special prev pages this will let us work with ids + // instead of titles later, which is much less error prone + union( + sqlContext.sparkContext.parallelize(wikiList.flatMap(wiki => Seq( + PageInfo(wiki, -1L, "other-empty", 0, pageIsRedirect = false, effectivePageId = -1), + PageInfo(wiki, -2L, "other-internal",0, pageIsRedirect = false, effectivePageId = -1), + PageInfo(wiki, -3L, "other-external",0, pageIsRedirect = false, effectivePageId = -1), + PageInfo(wiki, -4L, "other-search",0, pageIsRedirect = false, effectivePageId = -1), + PageInfo(wiki, -5L, "other-other",0, pageIsRedirect = false, effectivePageId = -1)))) + ) + } + + /** + * Prepare the redirects dataset to be reused + * + * @return A RDD of redirect data + */ + def prepareRedirects( + sqlContext: SQLContext, + redirectTable: String, + snapshot: String, + wikiList: Seq[String], + pages: RDD[PageInfo] + ): RDD[Redirect] = { + + val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId), p.pageTitle)).cache() + val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle, p.pageNamespace), p.pageId)).cache() + + sqlContext.sql( + s""" + |SELECT + | wiki_db, + | rd_from, + | rd_title, + | rd_namespace + |FROM $redirectTable + |WHERE snapshot = '$snapshot' + | AND wiki_db IN (${listToSQLInCondition(wikiList)}) + | AND rd_from IS NOT NULL + | AND rd_from > 0 + |GROUP BY + | wiki_db, + | rd_from, + | rd_title, + | rd_namespace + """.stripMargin) + .rdd + .map(r => { + ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3))) + }) + .filter(t => Option(t._2._1).isDefined) // Remove null toPageTitle + .join(pagesPerPageId) // Ensure fromPageId exists in page table + .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _)) => + ((wiki, toPageTitle, toPageNamespace), fromPageId) } + .join(pagesPerTitleAndNamespace) // Get toPageId from page table + .map { case ((wiki, _ , _), (fromPageId, toPageId)) => + Redirect(wiki, fromPageId, toPageId) } + .distinct // prevent any duplicate (data corruption) + } + + /** + * Replace page ids for pages that are redirected. + * + * @return PageInfo with redirects applied to relevant page ids + */ + def resolveRedirects( + sqlContext: SQLContext, + pages: RDD[PageInfo], + redirects: RDD[Redirect] + ) : RDD[PageInfo] = { + + var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId), r.toPageId)).cache() + + // This function maps redirects once + val processRedirects = (pages: RDD[PageInfo]) => { + pages.map( p => ((p.wikiDb, p.effectivePageId), p) ) + .leftOuterJoin(redirectsBySrc) + .map { case (_, (p, redirectId)) => + PageInfo(p.wikiDb, + p.pageId, + p.pageTitle, p.pageNamespace, + p.pageIsRedirect, + redirectId.getOrElse(p.effectivePageId)) + } + } : RDD[PageInfo] + + // We only process two-levels of redirects. Is this okay? + processRedirects( + processRedirects(pages)) + } + + + /** + * Prepare the view data + * @return A RDD of page views + */ + def prepareViews( + sqlContext: SQLContext, + webrequestTable: String, + year: Int, + month: Int, + day: Option[Int], + hour: Option[Int], + projectList: Seq[String], + projectToWikiMap: Map[String, String], + pages: RDD[PageInfo] + ): RDD[PageView] = { + val pagesPerTitles = pages + .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) // ((wiki, pageTitle), (pageNamespace, pageId)) + .cache() + + sqlContext.sql( + s""" + |SELECT + | CONCAT(pageview_info['project'], '.org') AS project, + | pageview_info['page_title'] as title, + | ts, + | client_ip, + | user_agent, + | x_forwarded_for + |FROM $webrequestTable + |WHERE webrequest_source = 'text' + | AND year = $year AND month = $month + | ${day.map(d => s"AND day = $d").getOrElse("")} + | ${hour.map(h => s"AND hour = $h").getOrElse("")} + | AND pageview_info['project'] IN (${listToSQLInCondition(projectList)}) + | AND is_pageview + | AND agent_type = 'user' + """.stripMargin) + .rdd // ((wiki, fromTitle), (ts, ip, user_agent, x_forward_for)) out of webrequest + .map(r => { + val wiki = projectToWikiMap(r.getString(0)) + val uh = userHash(r.getString(3), r.getString(4), r.getString(5)) + ((wiki, r.getString(1)), (r.getTimestamp(2), uh)) // + }) + .join(pagesPerTitles) + .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) => + PageView(wiki, userHash, ts.getTime, pageNs, pageId) + } + } + + /** + * Aggregates page views by user / wiki and groups them into sessions. + * The iterator is necessary because there could be millions / billions + * of page views on a partition so we cannot load the result set in memory. + * + * @return + */ + def createSessions(views: RDD[PageView], timeoutInMillis: Int): RDD[Seq[PageView]] = { + views + .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) } + .repartitionAndSortWithinPartitions(new SessionPartitioner(20)) + .mapPartitions { + iter => + new Iterator[Seq[PageView]] { + var eos: Boolean = !iter.hasNext + var session = new ArrayBuffer[PageView]() + var nextSessionHead : Option[PageView] = None + + override def hasNext: Boolean = { + tryToFillSession() + session.nonEmpty + } + + override def next: Seq[PageView] = { + tryToFillSession() + assert(session.nonEmpty) + + // Create a new empty session and place the next session head into it + var s = session + session = new ArrayBuffer[PageView]() + if (nextSessionHead.isDefined) { + session += nextSessionHead.get + nextSessionHead = None + } + s + } + + /** + * Recursively adds the next page view onto the current session until + * we are certain it belongs to the next session. The sign this has occurred + * is nextSessionHead is non-empty + */ + def tryToFillSession(): Unit = { + if (!eos && nextSessionHead.isEmpty && iter.hasNext) { + var p = iter.next._2 + if (session.isEmpty || sameSession(p, session.last, timeoutInMillis)) { + session += p + tryToFillSession() + } else { + nextSessionHead = Some(p) + } + } + } + } + } + } + + + /** + * Write sessions to output files. + * TODO: Overwrite existing files. + */ + def writeSessions(sessions: RDD[Seq[PageView]], outputBasePath: String, outputFilesParts: Int): Unit = { + sessions + .repartition(outputFilesParts) + .filter(views => views.nonEmpty) + .map(views => views.head.wikiDb + "\t" + + new java.sql.Timestamp(views.head.tstamp) + "\t" + + views.map(v => v.pageId.toString).mkString("\t")) + .saveAsTextFile(outputBasePath) + } + + /** + * Config class for CLI argument parser using scopt + */ + case class Params( + outputBasePath: String = "/user/shiladsen/sessions", + projectNamespaceTable: String = "wmf_raw.mediawiki_project_namespace_map", + pageTable: String = "wmf_raw.mediawiki_page", + redirectTable: String = "wmf_raw.mediawiki_redirect", + webrequestTable: String = "wmf.webrequest", + wikiList: Seq[String] = Seq("simplewiki"), + outputFilesParts: Int = 1, + snapshot: String = "", // Parameter required, never used as is + year: Int = 0, // Parameter required, never used as is + month: Int = 0, // Parameter required, never used as is + day: Option[Int] = None, + hour: Option[Int] = None, + sessionTimeoutMillis: Int = 15*60*1000 + ) + + /** + * Define the command line options parser + */ + val argsParser = new OptionParser[Params]("Clickstream dataset builder") { + head("Clickstream dataset builder", "") + note( + """ + |This job computes a clickstream dataset from one or more wiki(s). + |It creates a date folder, and per-wiki folders to store the results. + """.stripMargin) + help("help") text ("Prints this usage text") + + opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { (x, p) => + p.copy(snapshot = x) + } text ("The mediawiki hadoop snapshot to use for page, links and redirects (usually YYYY-MM)") + + opt[Int]('y', "year") required() valueName ("<year>") action { (x, p) => + p.copy(year = x) + } text ("The year to use for webrequest data gathering.") + + opt[Int]('m', "month") required() valueName ("<month>") action { (x, p) => + p.copy(month = x) + } validate { x => if (x > 0 & x <= 12) success else failure("Invalid month") + } text ("The month to use for webrequest data gathering.") + + opt[Int]('d', "day") optional() valueName ("<day>") action { (x, p) => + p.copy(day = Some(x)) + } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day") + } text ("The day to use for webrequest data gathering (default to empty, for monthly computation).") + + opt[Int]('h', "hour") optional() valueName ("<hour>") action { (x, p) => + p.copy(hour = Some(x)) + } validate { x => if (x >= 0 & x < 24 ) success else failure("Invalid hour") + } text ("The hour to use for webrequest data gathering (default to empty, for daily or monthly computation).") + + opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, p) => + p.copy(hour = Some(x)) + } validate { x => if (x >= 0) success else failure("Invalid timeout") + } text ("The timeout in milliseconds that distinguishes user sessions.") + + + opt[String]('o', "output-base-path") optional() valueName ("<path>") action { (x, p) => + p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x) + } text ("Where on HDFS to store the computed dataset (date folder created for you). Defaults to hdfs://analytics-hadoop/wmf/data/archive/clickstream") + + opt[String]('w', "wikis") optional() valueName "<wiki_db_1>,<wiki_db_2>..." action { (x, p) => + p.copy(wikiList = x.split(",").map(_.toLowerCase)) + } validate { x => + val dbs = x.split(",").map(_.toLowerCase) + if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0) + failure("Invalid wikis list") + else + success + } text "wiki dbs to compute. Defaults to enwiki" + + opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") action { (x, p) => + p.copy(outputFilesParts = x) + } text ("Number of file parts to output in hdfs. Defaults to 1") + + opt[String]("project-namespace-table") optional() valueName ("<table>") action { (x, p) => + p.copy(projectNamespaceTable = x) + } text ("Fully qualified name of the project-namespace table on Hive. Default to wmf_raw.mediawiki_project_namespace_map") + + opt[String]("page-table") optional() valueName ("<table>") action { (x, p) => + p.copy(pageTable = x) + } text ("Fully qualified name of the page table on Hive. Default to wmf_raw.mediawiki_page") + + opt[String]("redirect-table") optional() valueName ("<table>") action { (x, p) => + p.copy(redirectTable = x) + } text ("Fully qualified name of the redirect table on Hive. Default to wmf_raw.mediawiki_redirect") + + opt[String]("webrequest-table") optional() valueName ("<table>") action { (x, p) => + p.copy(webrequestTable = x) + } text ("Fully qualified name of the webrequest table on Hive. Default to wmf.webrequest") + + } + + + def main(args: Array[String]): Unit = { + val params = args.headOption match { + // Case when our job options are given as a single string. Split them + // and pass them to argsParser. + case Some("--options") => + argsParser.parse(args(1).split("\\s+"), Params()).getOrElse(sys.exit(1)) + // Else the normal usage, each CLI opts can be parsed as a job option. + case _ => + argsParser.parse(args, Params()).getOrElse(sys.exit(1)) + } + + // Exit non-zero if if any refinements failed. + apply(params) + } + + def apply(params: Params): Unit = { + + val conf = new SparkConf() + .setAppName(s"ClickStreamBuilder") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array( + classOf[PageInfo], + classOf[Redirect], + classOf[PageView])) + .set("spark.hadoop.mapred.output.compress", "true") + .set("spark.hadoop.mapred.output.compression.codec", "true") + .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") + .set("spark.hadoop.mapred.output.compression.type", "BLOCK") + + val sqlContext = new HiveContext(new SparkContext(conf)) + + LogManager.getRootLogger.setLevel(Level.WARN) + sqlContext.sparkContext.parallelize(Seq("")).foreachPartition(x => { + import org.apache.log4j.{LogManager, Level} + LogManager.getRootLogger.setLevel(Level.WARN) + }) + + import sqlContext.implicits._ + + val projectToWikiMap = prepareProjectToWikiMap(sqlContext, params.projectNamespaceTable, params.snapshot, params.wikiList) + val domainList = projectToWikiMap.keys.toList + val projectList = domainList.map(_.stripSuffix(".org")) + + val outputFolder = "/user/shilad/wmf/data/archive/clickstream/" + + f"${params.year}%04d-${params.month}%02d${params.day.map(d => f"-$d%02d").getOrElse("")}${params.hour.map(h => f"-$h%02d").getOrElse("")}" + + // Reused RDDs + val pages = preparePages(sqlContext, params.pageTable, params.snapshot, params.wikiList).cache() + val redirects = prepareRedirects(sqlContext, params.redirectTable, params.snapshot, params.wikiList, pages) + val pages2 = resolveRedirects(sqlContext, pages, redirects) + val views = prepareViews( + sqlContext, params.webrequestTable, + params.year, params.month, params.day, params.hour, + projectList, projectToWikiMap, + pages) + val sessions = createSessions(views, params.sessionTimeoutMillis) + writeSessions(sessions, params.outputBasePath, params.outputFilesParts) + } +} -- To view, visit https://gerrit.wikimedia.org/r/377706 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Shilad Sen <s...@macalester.edu> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits