Shilad Sen has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/383761 )
Change subject: Spark job to create page ids viewed in each session ...................................................................... Spark job to create page ids viewed in each session SessionPagesBuilder creates a table representing viewed page grouped by browser session. The output is a table containing columns for wiki, date, timestamp, and a space separated list of all the page ids viewed in the session in order. The job now runs on the cluster in a reasonable amount of time (10 min for a day's worth of views). SessionPruner filters the session table and removes any views of pages below some threshold. As a side effect it creates a frequency table. The testing harness creates fake test data and compares computed spark results against computed in-memory results. TODO: * Oozify job (may require switching to Spark 2) Bug: T174796 Change-Id: I55395459d80d73f3d065967ce95d6506698d128e Complete pass at session creation pipeline Added session pruner Switched to use tables instead of files Cleaned up tests Change-Id: I19160e16d8140d03d81a4226e3974f42ec1e3602 --- A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala 9 files changed, 1,425 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/61/383761/1 diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala new file mode 100644 index 0000000..cd43cea --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala @@ -0,0 +1,96 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.sql.Timestamp + +import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval} + +/** + * @author Shilad Sen + */ +object PartitionQueryBuilder { + + /** + * Creations a SQL predicate that includes the time period from beginTs to endTs + * + * The SQL predicate is somewhat compressed, but there is still room for improvement. + * + * @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and hour < 3)" + */ + def formSql(beginTs: Timestamp, endTs: Timestamp) : String = { + val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy() + val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy() + if (begin == end) { + s"(year = ${begin.getYear} " + + s"and month = ${begin.getMonthOfYear} " + + s"and day = ${begin.getDayOfMonth} " + + s"and hour = ${begin.getHourOfDay})" + } else { + formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ") + } + } + + def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = { + if (begin == end) { + return List() + } + + // Try to take a year out of the middle + var startYear = begin.year().roundCeilingCopy() + var endYear = startYear.plusYears(1) + if (!startYear.isBefore(begin) && !endYear.isAfter(end)) { + return formSqlConditions(begin, startYear) ++ + List(s"year = ${startYear.getYear}") ++ + formSqlConditions(endYear, end) + } + + // Try to take a month out of the middle + var startMonth = begin.monthOfYear().roundCeilingCopy() + var endMonth = startMonth.plusMonths(1) + if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) { + return formSqlConditions(begin, startMonth) ++ + List(s"year = ${startMonth.getYear} " + + s"and month = ${startMonth.getMonthOfYear}") ++ + formSqlConditions(endMonth, end) + } + + // Try to take a day out of the middle + var startDay = begin.dayOfMonth().roundCeilingCopy() + var endDay = startDay.plusDays(1) + if (!startDay.isBefore(begin) && !endDay.isAfter(end)) { + return formSqlConditions(begin, startDay) ++ + List(s"year = ${startDay.getYear} " + + s"and month = ${startDay.getMonthOfYear} " + + s"and day = ${startDay.getDayOfMonth}") ++ + formSqlConditions(endDay, end) + } + + // Do we have a collection of hours that run up to the end of the starting day? + var startOfNextDay = begin.withTimeAtStartOfDay().plusDays(1) + if (!startOfNextDay.isAfter(end)) { + return List(s"year = ${begin.getYear} " + + s"and month = ${begin.getMonthOfYear} " + + s"and day = ${begin.getDayOfMonth} " + + s"and hour >= ${begin.getHourOfDay}") ++ + formSqlConditions(startOfNextDay, end) + } + + // Do we have a collection of hours that start at the beginning of the last day? + var startOfLastDay = end.withTimeAtStartOfDay() + if (!startOfLastDay.isBefore(begin)) { + return formSqlConditions(begin, startOfLastDay) ++ + List(s"year = ${end.getYear} " + + s"and month = ${end.getMonthOfYear} " + + s"and day = ${end.getDayOfMonth} " + + s"and hour <= ${end.getHourOfDay}") + } + + // We must have an hour range within the same day. + assert(begin.withTimeAtStartOfDay() == end.withTimeAtStartOfDay()) + List(s"year = ${begin.getYear} " + + s" and month = ${begin.getMonthOfYear}" + + s" and day = ${begin.getDayOfMonth}" + + s" and hour >= ${begin.getHourOfDay}" + + s" and hour <= ${end.getHourOfDay}") + + } +} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala new file mode 100644 index 0000000..fe0b839 --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala @@ -0,0 +1,453 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.sql.Timestamp + +import org.joda.time.Hours +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} + +import scala.util.Try + + +/** + * Process to create a page view session log using data in a webrequest table. + * + * This created table has one row per session. + * The first three columns are wiki, date, and timestamp. + * The last column contains the page ids viewed in the session, in order, joined by spaces. + * + * @author Shilad Sen + */ +object SessionPagesBuilder { + + import org.apache.spark.sql.hive.HiveContext + import org.apache.spark.SparkConf + import org.apache.spark.sql.SaveMode + import org.apache.spark.SparkContext + import scopt.OptionParser + import org.apache.spark.rdd.RDD + import org.apache.spark.sql.SQLContext + import org.joda.time.DateTime + import org.apache.log4j.{Level, LogManager} + import org.apache.spark.Partitioner + import java.sql.Timestamp + + import scala.collection.mutable.ArrayBuffer + import scala.util.hashing.MurmurHash3 + + import VectorUtils.{toTimestamp, toInt, parseTimestamp, TS_FORMATS} + + /** + * Number of seconds of later data to gather beyond the + * time interval that is requested in the params to try to + * look for the end of sessions that spill over the interval. + * These will be included + */ + val LOOK_FUTURE_SECS: Int = 60 * 60 * 3 + + /** + * How far in the past to look for sessions with start times BEFORE + * the requested interval that spill into the requested interval. + * These will be excluded from the output. + */ + val LOOK_PAST_SECS: Int = 60 * 59 + + /** + * A user session is defined by: A wiki, a hash of unique user information, + * and the timestamp for a view. This is used to stream through pageviews + * that may have been part of the same user session. + */ + case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long) + extends Ordered[SessionKey] { + override def compare(that: SessionKey): Int = { + import scala.math.Ordered.orderingToOrdered + + Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare( + (wikiDb, userHash, tstamp), + (that.wikiDb, that.userHash, that.tstamp) + ) + } + } + + /** + * Partitions all views for the same wiki from the same user onto a single partition. + */ + class SessionPartitioner(partitions: Int) extends Partitioner { + require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") + + override def numPartitions: Int = partitions + + override def getPartition(key: Any): Int = { + val k = key.asInstanceOf[SessionKey] + Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % numPartitions + } + } + + object SessionKey { + implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = { + Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp)) + } + } + + + /** + * A view of a page by a user. + * tstamp is milliseconds since the epoch. + * userHash is a 64-bit hash of user ip and other unique information. + */ + case class PageView( + wikiDb: String, + userHash: Long, + tstamp: Long, + pageNamespace: Long, + pageId: Long + ) + + /** + * Returns true iff the two view are part of the same session. + * User hashes must match and the pageviews must fall within a specified window. + */ + def sameSession(v1: PageView, v2: PageView, timeoutSecs: Long) : Boolean = { + (v1.wikiDb == v2.wikiDb) && + (v1.userHash == v2.userHash) && + (Math.abs(v1.tstamp - v2.tstamp) <= timeoutSecs * 1000) + } + + + /** + * Given components associated with a distinct user , return a long hash + * @param client_ip + * @param user_agent + * @param x_forwarded_for + * @return + */ + def userHash( + client_ip: String, + user_agent: String, + x_forwarded_for: String + ) : Long = { + var s = client_ip + ":" + user_agent + ":" + x_forwarded_for + (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) | + (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL) + } + + def listToSQLInCondition(list: Seq[String]): String = list.map(w => s"'$w'").mkString(", ") + + /** + * Prepare a map linking project hostnames to project dbnames. + * + * @return A map[hostname -> dbname] + */ + def prepareProjectToWikiMap( + sqlContext: SQLContext, + projectNamespaceTable: String, + snapshot: String, + wikiList: Seq[String] + ): Map[String, String] = { + sqlContext.sql( + s""" + |SELECT DISTINCT + | hostname, + | dbname + |FROM $projectNamespaceTable + |WHERE snapshot = '$snapshot' + | AND dbname IN (${listToSQLInCondition(wikiList)}) + """.stripMargin). + rdd. + collect. + map(r => r.getString(0) -> r.getString(1)).toMap + } + + + /** + * Prepare the view data + * @return A RDD of page views + */ + def prepareViews( + sqlContext: SQLContext, + webrequestTable: String, + begin: Timestamp, + end: Timestamp, + projectList: Seq[String], + projectToWikiMap: Map[String, String] + ): RDD[PageView] = { + + sqlContext.sql( + s""" + |SELECT + | CONCAT(pageview_info['project'], '.org') AS project, + | namespace_id, + | page_id, + | ts, + | client_ip, + | user_agent, + | x_forwarded_for + |FROM $webrequestTable + |WHERE webrequest_source = 'text' + | AND access_method = 'desktop' + | AND normalized_host.project_class IN ('wikipedia', 'wikimedia') + | AND(${PartitionQueryBuilder.formSql(begin, end)}) + | AND pageview_info['project'] IN (${listToSQLInCondition(projectList)}) + | AND is_pageview + | AND namespace_id IS NOT NULL + | AND page_id IS NOT NULL + | AND agent_type = 'user' + """.stripMargin) + .rdd + .map(r => { + val wiki = projectToWikiMap(r.getString(0)) + val uh = userHash(r.getString(4), r.getString(5), r.getString(6)) + PageView(wiki, uh, toTimestamp(r.get(3)).getTime, toInt(r.get(1)), toInt(r.get(2))) + }) + } + + /** + * Aggregates page views by user / wiki and groups them into sessions. + *G + * 1. Create session key composed of wiki, userhash, tstamp that delineates sessions. + * 2. Partition by wiki, userhash with secondary sort on tstamp. + * 3. Reduce page views into sessions on each partitition. The iterator + * is necessary for the final step because there could be millions / billions + * of page views on a partition so we cannot load the result set in memory. + * + * @return + */ + def createSessions(views: RDD[PageView], timeoutInSecs: Int): RDD[Seq[PageView]] = { + views + .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) } + .repartitionAndSortWithinPartitions(new SessionPartitioner(views.getNumPartitions)) + .mapPartitions { + iter => + new Iterator[Seq[PageView]] { + var eos: Boolean = !iter.hasNext + var session = new ArrayBuffer[PageView]() + var nextSessionHead : Option[PageView] = None + + override def hasNext: Boolean = { + tryToFillSession() + session.nonEmpty + } + + override def next: Seq[PageView] = { + // If hasNext returns true we should always have a session + tryToFillSession() + assert(session.nonEmpty) + + // Create a new empty session and place the next session head into it + var s = session + session = new ArrayBuffer[PageView]() + if (nextSessionHead.isDefined) { + session += nextSessionHead.get + nextSessionHead = None + } + s + } + + /** + * Recursively adds the next page view onto the current session until + * we are certain it belongs to the next session. The sign this has occurred + * is nextSessionHead is non-empty + */ + def tryToFillSession(): Unit = { + if (!eos && nextSessionHead.isEmpty && iter.hasNext) { + var p = iter.next._2 + if (session.isEmpty || sameSession(p, session.last, timeoutInSecs)) { + session += p + tryToFillSession() + } else { + nextSessionHead = Some(p) + } + } + } + } + } + } + + + /** + * Write sessions to output files. + * TODO: Overwrite existing files. + */ + def writeSessions(sqlContext: SQLContext, sessions: RDD[Seq[PageView]], outputTable: String): Unit = { + import sqlContext.implicits._ + + val df = + sessions + .map { views => + // Make concatenated ids with namespace 0 + val ids = views + .filter(_.pageNamespace == 0) + .map(_.pageId.toString) + .mkString(" ") + + // YYYY-MM-DDTHH:MM:SS + val ts = new Timestamp(views.head.tstamp).toString + val date = ts.substring(0, 10) + + // Timestamp for session is timestamp for first view, regardless of + // whether it is ultimately filtered out + (views.head.wikiDb, date, ts, ids) + } + .filter(_._4.nonEmpty) // Remove sessions with zero pageviews + .toDF("wiki_db", "date", "tstamp", "pages") + + if (sqlContext.isInstanceOf[HiveContext]) { + df.write + .format("parquet") + .mode(SaveMode.Overwrite) + .partitionBy("wiki_db", "date") + .saveAsTable(outputTable) + } else { + // We are in unit testing mode so we can't write a real table + df.registerTempTable(outputTable) + } + } + + /** + * Config class for CLI argument parser using scopt + */ + case class Params( + outputTable: String = "shilad.sessions", + projectNamespaceTable: String = "wmf_raw.mediawiki_project_namespace_map", + webrequestTable: String = "wmf.webrequest", + wikiList: Seq[String] = Seq("simplewiki"), + outputFilesParts: Int = 1, + snapshot: String = "", // Parameter required, never used as is + begin: Timestamp = new Timestamp(0), // Parameter required, never used as is + end: Timestamp = new Timestamp(0), // Parameter required, never used as is + sessionTimeoutSecs: Int = 15*60 + ) + + + /** + * Define the command line options parser + */ + val argsParser = new OptionParser[Params]("Pageview session builder") { + head("Pageview session builder", "") + note( + """ + |This job computes a clickstream dataset from one or more wiki(s). + |It creates a date folder, and per-wiki folders to store the results. + """.stripMargin) + help("help") text ("Prints this usage text") + + opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { (x, p) => + p.copy(snapshot = x) + } text ("The mediawiki hadoop snapshot to use for page, links and redirects (usually YYYY-MM)") + + opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p) => + p.copy(begin = parseTimestamp(x).get) + } validate { x => + if (parseTimestamp(x).isDefined) success + else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}") + } text ("The beginning time for webrequest data gathering.") + + opt[String]('e', "end") required() valueName ("<end>") action { (x, p) => + p.copy(end = parseTimestamp(x).get) + } validate { x => + if (parseTimestamp(x).isDefined) success + else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}") + } text ("The ending time for webrequest data gathering.") + + opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, p) => + p.copy(sessionTimeoutSecs = x) + } validate { x => if (x >= 0) success else failure("Invalid timeout") + } text ("The timeout in seconds that delineates user sessions.") + + + opt[String]('o', "output-table") optional() valueName ("<table>") action { (x, p) => + p.copy(outputTable = x) + } text ("Hive table to store the computed user sessions") + + opt[String]('w', "wikis") optional() valueName "<wiki_db_1>,<wiki_db_2>..." action { (x, p) => + p.copy(wikiList = x.split(",").map(_.toLowerCase)) + } validate { x => + val dbs = x.split(",").map(_.toLowerCase) + if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0) + failure("Invalid wikis list") + else + success + } text "wiki dbs to compute. Defaults to enwiki" + + opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") action { (x, p) => + p.copy(outputFilesParts = x) + } text ("Number of file parts to output in hdfs. Defaults to 1") + + opt[String]("project-namespace-table") optional() valueName ("<table>") action { (x, p) => + p.copy(projectNamespaceTable = x) + } text ("Fully qualified name of the project-namespace table on Hive. Default to wmf_raw.mediawiki_project_namespace_map") + + opt[String]("webrequest-table") optional() valueName ("<table>") action { (x, p) => + p.copy(webrequestTable = x) + } text ("Fully qualified name of the webrequest table on Hive. Default to wmf.webrequest") + + } + + + def main(args: Array[String]): Unit = { + + val params = args.headOption match { + // Case when our job options are given as a single string. Split them + // and pass them to argsParser. + case Some("--options") => + argsParser.parse(args(1).split("\\s+"), Params()).getOrElse(sys.exit(1)) + // Else the normal usage, each CLI opts can be parsed as a job option. + case _ => + argsParser.parse(args, Params()).getOrElse(sys.exit(1)) + } + + + val conf = new SparkConf() + .setAppName(s"SessionPagesBuilder") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.hadoop.mapred.output.compress", "true") + .set("spark.hadoop.mapred.output.compression.codec", "true") + .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") + .set("spark.hadoop.mapred.output.compression.type", "BLOCK") + + val sqlContext = new HiveContext(new SparkContext(conf)) + + // Exit non-zero if if any refinements failed. + apply(sqlContext, params) + } + + def apply(sql: SQLContext, params: Params): Unit = { + sql.sparkContext + .getConf + .registerKryoClasses(Array( + classOf[PageView])) + + import sql.implicits._ + + val projectToWikiMap = prepareProjectToWikiMap( + sql, + params.projectNamespaceTable, + params.snapshot, + params.wikiList) + val domainList = projectToWikiMap.keys.toList + val projectList = domainList.map(_.stripSuffix(".org")) + + // Look into the past to make sure we don't jump in during the middle + // of a session that has a start time before the requested begin + val viewBeg = new Timestamp(new DateTime(params.begin.getTime) + .minusSeconds(2 * params.sessionTimeoutSecs) + .getMillis) + + // Look into the future to wrap up any sessions that started in the time + // interval but spill over. + val viewEnd = new Timestamp(new DateTime(params.end.getTime) + .plusSeconds(LOOK_FUTURE_SECS) + .getMillis) + + val views = prepareViews( sql, params.webrequestTable, + viewBeg, viewEnd, + projectList, projectToWikiMap) + + val sessions = createSessions(views, params.sessionTimeoutSecs) + .filter { ts => + ts.head.tstamp >= params.begin.getTime && + ts.head.tstamp <= params.end.getTime + } + + writeSessions(sql, sessions, params.outputTable) + } +} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala new file mode 100644 index 0000000..1b270fb --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala @@ -0,0 +1,243 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.sql.Timestamp + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} +import org.apache.spark.{SparkConf, SparkContext} +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} +import scopt.OptionParser + +import scala.util.Try + +import VectorUtils.{toTimestamp, toInt, parseTimestamp, TS_FORMATS} + +/** + * @author Shilad Sen + */ +object SessionPruner { + + /** + * A session of pageviews. + */ + case class Session(wiki: String, begin: Long, views: Seq[Long]) + + def listToSQLInCondition(list: Seq[String]): String = list.map(w => s"'$w'").mkString(", ") + + /** + * Calculate the frequency table for pages within a session data frame + */ + def calculateFreq(df: DataFrame) : RDD[(String, String,Int)] = { + df + .select("wiki_db", "pages") + .rdd + .flatMap { + case Row(w: String, pages: String) => + pages.split(" ").map((w, _)) // (wiki, page_id) + } + .map((_, 1)) // ((wiki, page_id), 1) + .reduceByKey((x : Int, y: Int) => x + y) // ((wiki, page_id), freq) + .map { case ((w, p), f) => (w, p, f) } // (wiki, page_id, freq) + } + + /** + * Prunes sessions to only those with views of pages in keepers. + * + * @param sessions Data frame for unpruned sessions table + * @param keepers (wiki, pageid) pairs that should be retained post-pruning. + */ + def prune( + sessions: DataFrame, + keepers: RDD[(String, String)]) + : RDD[(String, String, Timestamp, String)] = { + + // Create RDD of ((wiki, page), (sessionId, i, date, tstamp)) + // sessionId is a unique id and i is the index of the page + // view within the session. This will be joined with the keepers RDD + val pageViews = + sessions + .select("wiki_db", "date", "tstamp", "pages") + .rdd + .zipWithUniqueId() + .flatMap { case (row, sessionId) => + val w = row.getString(0) + val d = row.getString(1) + val ts = toTimestamp(row.get(2)) + val pages = row.getString(3).split(" ") + + pages + .zipWithIndex + .map { case (p, i) => ((w, p), (sessionId, i, d, ts)) } + } + + // Add a garbage column to the keepers to make it joinable + val keepersToJoin = keepers.map((_, 1)) + + // Join the two datasets to filter out unused pageviews + // Recreate the original session data, and return it + pageViews + .join(keepersToJoin) + .map { case ((wiki, page), ((sessionId, i, date, tstamp), garbage)) => + (sessionId, (i, date, tstamp, wiki, page)) + } + .groupByKey() + .map { case (sessionId, views) => + val date = views.head._2 + val tstamp = views.head._3 + val wiki = views.head._4 + val pages = views.toList.sortBy{_._1}.map(_._5) // sort by i, extract page + + (wiki, date, tstamp, pages.mkString(" ")) + } + } + + + /** + * Config class for CLI argument parser using scopt + */ + case class Params( + inputTable: String = "sessions", + outputTable: String = "sessions_pruned", + freqTable: Option[String] = None, + wikiList: Seq[String] = Seq("simplewiki"), + begin: Timestamp = new Timestamp(0), // Parameter required, never used as is + end: Timestamp = new Timestamp(0), // Parameter required, never used as is + minFreq: Int = 50 + ) + + + /** + * Define the command line options parser + */ + val argsParser = new OptionParser[Params]("Session pruner") { + head("Session pruner", "") + note( + """ + |This job prunes a session pageview dataset for one or more wiki(s). + |Only those pages above some frequency threshold are retained. + |It creates a date folder, and per-wiki folders to store the results. + """.stripMargin) + help("help") text ("Prints this usage text") + + opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p) => + p.copy(begin = parseTimestamp(x).get) + } validate { x => + if (parseTimestamp(x).isDefined) success + else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}") + } text ("The beginning time for webrequest data gathering.") + + opt[String]('e', "end") required() valueName ("<end>") action { (x, p) => + p.copy(end = parseTimestamp(x).get) + } validate { x => + if (parseTimestamp(x).isDefined) success + else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}") + } text ("The ending time for webrequest data gathering.") + + opt[String]('f', "freq-table") optional() valueName ("<table>") action { (x, p) => + p.copy(freqTable = Some(if (x.endsWith("/")) x.dropRight(1) else x)) + } text ("SQL table name for output frequency file.") + + opt[String]('i', "input-table") optional() valueName ("<table>") action { (x, p) => + p.copy(inputTable = x) + } text ("Sql table name for input sessions.") + + opt[String]('o', "outputTable") optional() valueName ("<table>") action { (x, p) => + p.copy(outputTable = x) + } text ("SQL table name for pruned sessions.") + + opt[String]('w', "wikis") optional() valueName "<wiki_db_1>,<wiki_db_2>..." action { (x, p) => + p.copy(wikiList = x.split(",").map(_.toLowerCase)) + } validate { x => + val dbs = x.split(",").map(_.toLowerCase) + if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0) + failure("Invalid wikis list") + else + success + } text "wiki dbs to compute. Defaults to enwiki" + + } + + def main(args: Array[String]): Unit = { + + val params = args.headOption match { + // Case when our job options are given as a single string. Split them + // and pass them to argsParser. + case Some("--options") => + argsParser.parse(args(1).split("\\s+"), Params()).getOrElse(sys.exit(1)) + // Else the normal usage, each CLI opts can be parsed as a job option. + case _ => + argsParser.parse(args, Params()).getOrElse(sys.exit(1)) + } + + val conf = new SparkConf() + .setAppName(s"SessionPagesBuilder") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.hadoop.mapred.output.compress", "true") + .set("spark.hadoop.mapred.output.compression.codec", "true") + .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") + .set("spark.hadoop.mapred.output.compression.type", "BLOCK") + + val sqlContext = new HiveContext(new SparkContext(conf)) + + // Exit non-zero if if any refinements failed. + apply(sqlContext, params) + } + + def apply(sql: SQLContext, params: Params): Unit = { + sql.sparkContext + .getConf + .registerKryoClasses(Array(classOf[Session])) + + val begDate = params.begin.toString.substring(0, 10) // YYYY-mm-dd + val endDate = params.end.toString.substring(0, 10) // YYYY-mm-dd + + val sessions = + sql.sql( + s""" + |SELECT wiki_db, date, tstamp, pages + |FROM ${params.inputTable} + |WHERE date >= '$begDate' + | AND date <= '$endDate' + | AND wiki_db IN (${listToSQLInCondition(params.wikiList)}) + | AND tstamp >= '${params.begin}' + | AND tstamp <= '${params.end}' + """.stripMargin) + + import sql.implicits._ + + val freq = calculateFreq(sessions) + + if (params.freqTable.isDefined) { + val df = freq.toDF("wiki_db", "page", "frequency") + if (sql.isInstanceOf[HiveContext]) { + df.write + .format("parquet") + .mode(SaveMode.Overwrite) + .partitionBy("wiki_db") + .saveAsTable(params.freqTable.get) + } else { + // We are in unit testing mode so we can't write a real table + df.registerTempTable(params.freqTable.get) + } + } + + val keepers = freq + .filter(_._3 >= params.minFreq) + .map{ triple => (triple._1, triple._2) } // (wiki, page) + + val prunedDf = prune(sessions, keepers) + .toDF("wiki_db", "date", "tstamp", "pages") + + if (sql.isInstanceOf[HiveContext]) { + prunedDf.write + .format("parquet") + .mode(SaveMode.Overwrite) + .partitionBy("wiki_db", "date") + .saveAsTable(params.outputTable) + } else { + // We are in unit testing mode so we can't write a real table + prunedDf.registerTempTable(params.outputTable) + } + } +} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala new file mode 100644 index 0000000..0ab8bbb --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala @@ -0,0 +1,53 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.sql.Timestamp + +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} + +import scala.util.Try + +/** + * @author Shilad Sen + */ +object VectorUtils { + + + /** + * Wrapper that allows us to handle production hive timestamps (which + * are already timestmaps), and testing data from JSON files (which are + * strings) uniformly. + */ + def toTimestamp(ts: Any) : Timestamp = { + ts match { + case _: Timestamp => + ts.asInstanceOf[Timestamp] + case _: String => + Timestamp.valueOf(ts.asInstanceOf[String]) + case _ => throw new IllegalArgumentException(ts.toString) + } + } + + /** + * Wrapper that allows us to handle production hive data and JSON testing + * data uniformly + */ + def toInt(x : Any) : Int = { + x match { + case y: Long => y.toInt + case _ => x.asInstanceOf[Int] + } + } + + + val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd") + val YMD_HMS: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss") + val TS_FORMATS: String = "yyyy-MM-dd or yyyy-MM-ddTHH:mm:ss" + + def parseTimestamp(s: String) : Option[Timestamp] = { + var ts = Try[java.sql.Timestamp](new Timestamp(YMD.parseMillis(s))) + if (ts.isFailure) { + ts = Try[java.sql.Timestamp](new Timestamp(YMD_HMS.parseMillis(s))) + } + if (ts.isFailure) None else Some(ts.get) + } +} diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala new file mode 100644 index 0000000..c9bf31e --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala @@ -0,0 +1,279 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.io.{File, FileWriter, PrintWriter} +import java.sql.Timestamp + +import org.joda.time.DateTime +import org.json.simple.JSONObject +import org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageView, userHash} + +import scala.collection.mutable +import scala.reflect.io.Path +import scala.util.Random + + +/** + * Creates fake data that can be processed by SessionPagesBuilder along with + * expected generated files. Files created are: + * + * 1) wikis.json: Input table mapping between domain names and wikis + * 2) requests.json: Input table containing page views. + * 3) sessions.txt: Output table created by SessionPagesBuilder + * 4) frequency.txt: Number of views per page in sessions.txt + * 5) sessions_pruned.txt: Sessions with pages that are rarely viewed removed. + * + * @author Shilad Sen + */ +object TestDataCreator { + val RAND = new Random(1) + + case class UserInfo(x: String, y: String, z: String, hash: Long) + case class Session(user: UserInfo, tstamp: Long, views: Seq[PageView]) + + + case class Params( + snapshot: String = "2016-10", + startTime: DateTime = new DateTime(2017, 1, 1, 21, 0, 0), + endTime: DateTime = new DateTime(2017, 1, 2, 2, 0, 0), + wikis: Seq[String] = (1 to 3).map{ "wiki_" + _ }, + numPages: Int = 100, + numSessions: Int = 100, + meanSessionLength : Int = 10, + sessionTimeoutSecs : Int = 60 * 15, + outputDir : File = new File("path-data"), + minFreq: Int = 5 + ) + + + /** + * Chooses a random element in a sequences. + */ + def choice[T](seq : Seq[T]) : T = { + seq(RAND.nextInt(seq.length)) + } + + val usedUserHashes: mutable.Set[Long] = mutable.Set[Long]() + + def randUser() : UserInfo = { + val randStr = () => { (1 to 10).map(_ => RAND.nextPrintableChar() + "").mkString("") } + val u = (randStr(), randStr(), randStr()) + val hash : Long = userHash(u._1, u._2, u._3) + if (usedUserHashes.contains(hash)) { + randUser() + } else { + usedUserHashes += hash + UserInfo(u._1, u._2, u._3, hash) + } + } + + def randPageView(user : UserInfo, wiki: String, tstamp: Long, numArticles : Int) : PageView = { + val pageId = RAND.nextInt(numArticles) + // pseudo-random namespace. it is deterministic given page id + val pageNs = (pageId.hashCode() % 10 - 8).max(0) + PageView(wiki, user.hash, tstamp, pageNs, pageId) + } + + /** + * Split a session into two sessions by the same user if possible. This is down + * by increment pageviews beyond some point in the session by a random offset that + * is bigger than the session timeout. + */ + def splitSessions(session : Session, sessionTimeoutSecs: Int, start: Long, end: Long) : List[Session] = { + val views = session.views + + val beg2 = views.last.tstamp + sessionTimeoutSecs * 1000 + 1 // Earliest start of a new session + if (views.length > 2 && beg2 < end) { + val offset = RAND.nextInt((end - beg2).asInstanceOf[Int]) + sessionTimeoutSecs * 1000 + 1 + val i = 1 + RAND.nextInt(views.length - 1) + val views1 = views.slice(0, i) + val views2 = views + .slice(i, views.length) + .map { v => + PageView(v.wikiDb, v.userHash, v.tstamp + offset, v.pageNamespace, v.pageId) + } + List( + Session(session.user, views1.head.tstamp, views1.toList), + Session(session.user, views2.head.tstamp, views2.toList) + ) + } else { + List(session) // Not possible to split. + } + } + + /** + * Generates random sessions with starting timestamps in the specified range. + */ + def generateSessions(params: Params): Seq[Session] = { + val start = params.startTime.getMillis + val end = params.endTime.getMillis + (1 to params.numSessions) + .flatMap { _ => + val n = 1 + RAND.nextInt(params.meanSessionLength * 2) + val w = choice(params.wikis) + val u = randUser() + val startTstamp = start + RAND.nextInt((end-start).asInstanceOf[Int]) + val tstamps = + (1 to n) + .foldLeft(List(startTstamp)) { (seq, _) => + (seq.head + RAND.nextInt(params.sessionTimeoutSecs*1000-1))::seq + } + .reverse + val views = tstamps.map { randPageView(u, w, _, params.numPages) }.toList + val session = Session(u, views.head.tstamp, views.toList) + + if (RAND.nextDouble() < 0.7) + List(session) + else + splitSessions(session, params.sessionTimeoutSecs, start, end) + } + } + + /** + * Prunes sessions: + * - Removes view for namespaces that are not 0 + * - Removes empty sessions + * - Removes sessions that have start times out of the requested range + */ + def pruneInvalidSessions(sessions: Seq[Session], begin: DateTime, end: DateTime) : Seq[Session] = { + sessions + .map { s => Session(s.user, s.tstamp, s.views.filter(_.pageNamespace == 0).toList) } + .filter { s => + s.views.nonEmpty && s.tstamp >= begin.getMillis && s.tstamp <= end.getMillis + } + } + + def writeSessions(path: String, sessions: Seq[Session]) : Unit = { + val out = new PrintWriter(new FileWriter(path)) + out.write("wiki_db\tdate\ttstamp\tpages\n") + sessions.foreach { + s => + val ts = new Timestamp(s.tstamp).toString + val date = ts.substring(0, 10) + val wiki = s.views.head.wikiDb + val pageIds = s.views.map { _.pageId.toString }.mkString(" ") + out.write(wiki + "\t" + date + "\t" + ts + "\t" + pageIds + "\n") + } + out.close() + } + + import scala.collection.JavaConverters._ + + + def writeWikis(path: String, wikis: Seq[String], snapshot: String): Unit = { + val out = new PrintWriter(new FileWriter(path)) + wikis.distinct.foreach { + w => + val js = Map( + "hostname" -> (w + ".org"), + "snapshot" -> snapshot, + "dbname" -> w + ) + JSONObject.writeJSONString(js.asJava, out) + out.write("\n") + } + out.close() + } + + def writeRequests(path: String, sessions: Seq[Session]): Unit = { + val allViews = sessions.flatMap { s => s.views.map((s.user, _)) } + + val out = new PrintWriter(new FileWriter(path)) + RAND.shuffle(allViews).foreach { case (user, view) => + val dt = new DateTime(view.tstamp) + + val js = Map( + "pageview_info" -> Map( "project" -> view.wikiDb ).asJava, + "ts" -> new Timestamp(dt.getMillis).toString, + "user_agent" -> user.x, + "namespace_id" -> view.pageNamespace, + "page_id" -> view.pageId, + "x_forwarded_for" -> user.y, + "client_ip" -> user.z, + "is_pageview" -> true, + "agent_type" -> "user", + "webrequest_source" -> "text", + "access_method" -> "desktop", + "normalized_host" -> Map("project_class" -> "wikimedia").asJava, + "year" -> dt.getYear, + "month" -> dt.getMonthOfYear, + "day" -> dt.getDayOfMonth, + "hour" -> dt.getHourOfDay + ) + JSONObject.writeJSONString(js.asJava, out) + out.write("\n") + } + out.close() + } + + def writeFreq(path: String, sessions: Seq[TestDataCreator.Session]) : Seq[(String, Long, Int)] = { + + val freqs = + sessions + .flatMap(_.views) + .map { v => (v.wikiDb, v.pageId) } + .groupBy(identity) + .mapValues(_.length) + .map { x => (x._1._1, x._1._2, x._2)} + + TestUtils.deleteRecursively(new File(path)) + + val out = new PrintWriter(new FileWriter(path)) + + out.write("wiki_db\tpage\tfrequency\n") + freqs.foreach { case (wiki, word, freq) => + out.write(wiki + "\t" + word + "\t" + freq + "\n") + } + + out.close() + + freqs.toList + } + + def writePruned(path: String, valid: Seq[Session], freq: Seq[(String, Long, Int)], minFreq: Int): Unit = { + val keepers = freq + .filter(_._3 >= minFreq) + .map { case (wiki, page, n) => (wiki, page) } + .toSet + + val out = new PrintWriter(new FileWriter(path)) + out.write("wiki_db\tdate\ttstamp\tpages\n") + valid.foreach { s => + val ts = new Timestamp(s.tstamp).toString + val date = ts.substring(0, 10) + val wiki = s.views.head.wikiDb + + val pageIds = s + .views + .filter { v => keepers.contains((v.wikiDb, v.pageId)) } + .map { _.pageId.toString } + .mkString(" ") + + if (pageIds.nonEmpty) { + out.write(wiki + "\t" + date + "\t" + ts + "\t" + pageIds + "\n") + } + } + + out.close() + } + + def generate(params: Params) : Unit = { + // Allow some sessions to start earlier than the requested range. + val minStart = params.startTime.getMillis - (params.endTime.getMillis - params.startTime.getMillis) / 4 + val sessions = generateSessions(params) + val valid = pruneInvalidSessions(sessions, params.startTime, params.endTime) + + val path = params.outputDir.getAbsolutePath + Path(path).createDirectory(force = true, failIfExists = false) + + writeWikis(path + "/wikis.json", params.wikis, params.snapshot) + writeRequests(path + "/requests.json", sessions) + writeSessions(path + "/sessions.txt", valid) + val freq = writeFreq(path + "/frequency.txt", valid) + writePruned(path + "/sessions_pruned.txt", valid, freq, params.minFreq) + } + + def main(args: Array[String]): Unit = { + RAND.setSeed(1L) + generate(Params()) + } +} diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala new file mode 100644 index 0000000..84c8bea --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala @@ -0,0 +1,77 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.sql.Timestamp + +import org.scalatest.{FlatSpec, Matchers} +import PartitionQueryBuilder.formSql + +class TestPartitionQueryBuilder extends FlatSpec with Matchers { + + def sqlOr(conds : Seq[String]): String = { conds.map("(" + _ + ")").mkString(" or ")} + def normalize(s : String) : String = { s.toLowerCase.replaceAllLiterally(" ", "")} + + "A PartitionQueryBuilder " should " build partition queries " in { + + val tests: Seq[(String, String, String)] = Seq( + ( // Simple example + "2017-01-04 03:10:00", + "2017-01-05 04:00:00", + sqlOr(List("year = 2017 and month = 1 and day = 4 and hour >= 3", + "year = 2017 and month = 1 and day = 5 and hour <= 4")) + ), + ( // Edge case with empty interval + "2017-01-04 03:00:00", + "2017-01-04 03:00:00", + sqlOr(List("year = 2017 and month = 1 and day = 4 and hour = 3")) + ), + ( // Start and end in same day + "2017-01-04 03:10:00", + "2017-01-04 07:00:00", + sqlOr(List( + "year = 2017 and month = 1 and day = 4 and hour >= 3 and hour <= 7")) + ), + ( // Have a full day in the middle + "2017-01-04 03:10:00", + "2017-01-06 04:00:00", + sqlOr(List( + "year = 2017 and month = 1 and day = 4 and hour >= 3", + "year = 2017 and month = 1 and day = 5", + "year = 2017 and month = 1 and day = 6 and hour <= 4")) + ), + ( // Have a full month in the middle + "2017-01-30 03:10:00", + "2017-03-02 04:00:00", + sqlOr(List( + "year = 2017 and month = 1 and day = 30 and hour >= 3", + "year = 2017 and month = 1 and day = 31", + "year = 2017 and month = 2", + "year = 2017 and month = 3 and day = 1", + "year = 2017 and month = 3 and day = 2 and hour <= 4")) + ), + ( // Have a full year in the middle + "2014-11-29 03:10:00", + "2017-02-02 04:00:00", + sqlOr(List( + "year = 2014 and month = 11 and day = 29 and hour >= 3", + "year = 2014 and month = 11 and day = 30", + "year = 2014 and month = 12", + "year = 2015", + "year = 2016", + "year = 2017 and month = 1", + "year = 2017 and month = 2 and day = 1", + "year = 2017 and month = 2 and day = 2 and hour <= 4")) + ) + ) + + tests.foreach( t => { + val beg = Timestamp.valueOf(t._1) + val end = Timestamp.valueOf(t._2) + val sql = t._3 + val res = formSql(beg, end) + System.out.println(res) + + normalize(sql) shouldEqual normalize(res) + }) + } + +} diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala new file mode 100644 index 0000000..9d33c9e --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala @@ -0,0 +1,72 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.io.File +import java.sql.Timestamp + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} + +case class Session(wiki_db: String, date: String, tstamp: Timestamp, pages: String) + +/** + * @author Shilad Sen + * + */ +class TestSessionPagesBuilder extends FlatSpec with Matchers with BeforeAndAfter { + + var DEBUG_MODE = true + var PARAMS : TestDataCreator.Params = null + + before { + + PARAMS = TestDataCreator.Params( + outputDir = + if (DEBUG_MODE) new File("testdata") + else File.createTempFile("test", "") + ) + + TestUtils.deleteRecursively(PARAMS.outputDir) + PARAMS.outputDir.mkdirs() + + TestDataCreator.generate(PARAMS) + } + + after { + if (!DEBUG_MODE) TestUtils.deleteRecursively(PARAMS.outputDir) + } + + "SessionPagesBuilder" should "extract sessions" in { + + val params = SessionPagesBuilder.Params( + outputTable = "sessions", + projectNamespaceTable = "wikis", + wikiList = PARAMS.wikis, + webrequestTable = "requests", + snapshot = PARAMS.snapshot, + begin = new Timestamp(PARAMS.startTime.getMillis), + end = new Timestamp(PARAMS.endTime.getMillis) + ) + + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("test-session-pages-builder") + val sc = new SparkContext(conf) + + val path = PARAMS.outputDir.getAbsoluteFile + val sql = new SQLContext(sc) + + sql.read.json(path + "/requests.json") + .registerTempTable(params.webrequestTable) + sql.read.json(path + "/wikis.json") + .registerTempTable(params.projectNamespaceTable) + + SessionPagesBuilder(sql, params) + + TestUtils.tableToCSV(sql, "sessions", path + "/spark_sessions.txt") + + assert(TestUtils.filesAreSame( + path + "/spark_sessions.txt", + path + "/sessions.txt")) + } +} \ No newline at end of file diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala new file mode 100644 index 0000000..ce406da --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala @@ -0,0 +1,88 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.io.File +import java.sql.Timestamp + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} + +import scala.io.Source + +/** + */ +class TestSessionPruner extends FlatSpec with Matchers with BeforeAndAfter { + + var DEBUG_MODE = true + var PARAMS : TestDataCreator.Params = null + + before { + + PARAMS = TestDataCreator.Params( + outputDir = + if (DEBUG_MODE) new File("testdata") + else File.createTempFile("test", "") + ) + + TestUtils.deleteRecursively(PARAMS.outputDir) + PARAMS.outputDir.mkdirs() + + TestDataCreator.generate(PARAMS) + } + + after { + if (!DEBUG_MODE) TestUtils.deleteRecursively(PARAMS.outputDir) + } + + + "SessionPruner" should "count and prune by frequency" in { + val path = PARAMS.outputDir.getAbsolutePath + + val params = SessionPruner.Params( + inputTable = "sessions", + outputTable = "pruned_sessions", + freqTable = Some("frequency"), + wikiList = PARAMS.wikis, + begin = new Timestamp(PARAMS.startTime.getMillis), + end = new Timestamp(PARAMS.endTime.getMillis), + minFreq = 5 + ) + + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("test-session-pages-builder") + val sc = new SparkContext(conf) + + val sql = new SQLContext(sc) + + sql + .read + .format("com.databricks.spark.csv") + .option("delimiter", "\t") + .option("header", "true") + .load(path + "/sessions.txt") + .registerTempTable("sessions") + + SessionPruner(sql, params) + + PARAMS.wikis.foreach { wiki => + TestUtils.tableToCSV(sql, "frequency", path + "/spark_frequency.txt") + + assert( + TestUtils.filesAreSame( + path + "/frequency.txt", + path + "/spark_frequency.txt" + ) + ) + } + + TestUtils.tableToCSV(sql, "pruned_sessions", path + "/spark_pruned_sessions.txt") + + assert( + TestUtils.filesAreSame( + path + "/sessions_pruned.txt", + path + "/spark_pruned_sessions.txt" + ) + ) + } +} \ No newline at end of file diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala new file mode 100644 index 0000000..afb2479 --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala @@ -0,0 +1,64 @@ +package org.wikimedia.analytics.refinery.job.vectors + +import java.io.{BufferedWriter, File, FileWriter} +import java.sql.Timestamp + +import org.apache.spark.sql.SQLContext + +import scala.io.Source + +/** + * @author Shilad Sen + */ +object TestUtils { + + /** + * Dumps a sql table as a CSV file + */ + def tableToCSV(sql: SQLContext, table: String, outputPath: String) : Unit = { + val tb = sql.table(table) + + val out = new BufferedWriter(new FileWriter(outputPath)) + out.write(tb.columns.mkString("\t") + "\n") + + tb.collect().foreach { row => + out.write(row.mkString("\t") + "\n") + } + + out.close() + } + + /** + * Checks whether the files are identical on a line-by-line basis. + * Lines are allowed to be reordered. + */ + def filesAreSame(computedPath : String, correctPath: String) : Boolean = { + val computed = Source.fromFile(computedPath).getLines.toSet + val correct = Source.fromFile(correctPath).getLines.toSet + + val nMissing = correct + .filter { !computed.contains(_) } + .toList + .sorted + .map { line => System.out.println("Missing correct line: " + line); line } + .length + + val nAdded = computed + .filter { !correct.contains(_) } + .toList + .sorted + .map { line => System.out.println("Additional incorrect line: " + line); line } + .length + + nMissing == 0 && nAdded == 0 + } + + + // From https://stackoverflow.com/a/26769030/141245 + def deleteRecursively(file: File): Unit = { + if (file.isDirectory) + file.listFiles.foreach(deleteRecursively) + if (file.exists && !file.delete) + throw new Exception(s"Unable to delete ${file.getAbsolutePath}") + } +} -- To view, visit https://gerrit.wikimedia.org/r/383761 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I19160e16d8140d03d81a4226e3974f42ec1e3602 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: nav-vectors Gerrit-Owner: Shilad Sen <s...@macalester.edu> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits