Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/297268

Change subject: [WIP] Process MediaWiki User history
......................................................................

[WIP] Process MediaWiki User history

Bug: T138861
Change-Id: Idd5cb2600636486356ba9f23e909b32fad677e1a
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala
1 file changed, 458 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/68/297268/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala
new file mode 100644
index 0000000..16d9288
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala
@@ -0,0 +1,458 @@
+/* create table statement for the user_history table
+CREATE EXTERNAL TABLE IF NOT EXISTS `mforns.simplewiki_user_history`(
+    `user_id`               bigint,
+    `user_name`             string,
+    `user_groups`           array<string>,
+    `user_blocks`           array<string>,
+    `user_registration`     string,
+    `start_timestamp`       string,
+    `end_timestamp`         string,
+    `caused_by_event_type`  string,
+    `caused_by_user_id`     bigint,
+    `caused_by_user_name`   string
+)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
+LOCATION '/user/mforns/mediawiki/simplewiki/user_history';
+*/
+
+import com.databricks.spark.avro._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SaveMode
+import scala.util.matching.Regex
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+
+case class UserEvent(
+    timestamp: Option[String] = None,
+    eventType: Option[String] = None,
+    causedByUserId: Option[Long] = None,
+    causedByUserName: Option[String] = None,
+    oldUserName: Option[String] = None,
+    newUserName: Option[String] = None,
+    oldUserGroups: Seq[String] = Seq.empty,
+    newUserGroups: Seq[String] = Seq.empty,
+    newUserBlocks: Seq[String] = Seq.empty,
+    blockExpiration: Option[String] = None
+)
+
+case class UserState(
+    userId: Option[Long] = None,
+    userName: Option[String] = None,
+    userGroups: Seq[String] = Seq.empty,
+    userBlocks: Seq[String] = Seq.empty,
+    userRegistration: Option[String] = None,
+    startTimestamp: Option[String] = None,
+    endTimestamp: Option[String] = None,
+    causedByEventType: Option[String] = None,
+    causedByUserId: Option[Long] = None,
+    causedByUserName: Option[String] = None,
+    causedByBlockExpiration: Option[String] = None
+) {
+    def toRow: Row = Row(
+        userId.getOrElse(null),
+        userName.getOrElse(null),
+        userGroups,
+        userBlocks,
+        userRegistration.getOrElse(null),
+        startTimestamp.getOrElse(userRegistration.getOrElse(null)),
+        endTimestamp.getOrElse(null),
+        causedByEventType.getOrElse(null),
+        causedByUserId.getOrElse(null),
+        causedByUserName.getOrElse(null)
+    )
+}
+
+val userNamesPattern = 
"""^[^\[]*\[\[[^:]*:([^|]*)\|.*\]\][^\[]*\[\[[^:]*:(.*)\|.*$""".r
+def getOldAndNewUserNames(
+    logParams: String, logComment: String, logTitle: String
+): (String, String) = {
+    try {
+        val logParamsMap = unserialize(logParams).asInstanceOf[Map[String, 
Any]]
+        (
+            logParamsMap("4::olduser").asInstanceOf[String],
+            logParamsMap("5::newuser").asInstanceOf[String]
+        )
+    } catch { case _: Throwable =>
+        try {
+            logComment match {
+                case userNamesPattern(oldName, newName) => (oldName, newName)
+            }
+        } catch { case _: Throwable =>
+            (logTitle.replaceAll("_", " "), logParams)
+        }
+    }
+}
+
+def csvToSeq(csv: String): Seq[String] = {
+    if (csv == "") Seq.empty else csv.trim.split(",").map(_.trim)
+}
+
+def getOldAndNewUserGroups(logParams: String): (Seq[String], Seq[String]) = {
+    try {
+        val paramsMap = unserialize(logParams).asInstanceOf[Map[String, Any]]
+        def paramToSeq(param: String): Seq[String] = {
+            paramsMap(param).asInstanceOf[Map[String, String]].values.toList
+        }
+        (paramToSeq("4::oldgroups"), paramToSeq("5::newgroups"))
+    } catch { case _: Throwable =>
+        try {
+            if (!logParams.contains("\n")) { throw new Exception }
+            val splitParams = logParams.split("\n")
+            if (splitParams.size == 1) {
+                (csvToSeq(splitParams(0)), Seq.empty)
+            } else if (splitParams.size == 2) {
+                (csvToSeq(splitParams(0)), csvToSeq(splitParams(1)))
+            } else { throw new Exception }
+        } catch { case _: Throwable =>
+            if (logParams.startsWith("=")) {
+                (Seq.empty, logParams.replaceAll("=", 
"").split(",").map(_.trim))
+            } else if (logParams.contains("+")) {
+                (Seq.empty, logParams.split(" ").map(_.replace("+", "").trim))
+            } else {
+                throw new Exception("Could not parse groups from: " + 
logParams)
+            }
+        }
+    }
+}
+
+val durationPattern = """^([0-9]+)\s([a-z]+?)s?$""".r
+def applyDurationToTimestamp(timestamp: String, duration: String): String = {
+    duration.trim match{
+        case "indefinite" | "infinite" | "never" => "indefinite"
+        case durationPattern(number, period) =>
+            val timestampFormat = DateTimeFormat.forPattern("yyyyMMddHHmmss")
+            val dateTime = DateTime.parse(timestamp, timestampFormat)
+            val factor = period match {
+                case "second" => 1
+                case "minute" => 60
+                case "hour" => 3600
+                case "day" => 86400
+                case "week" => 604800
+                case "month" => 2592000
+                case "year" => 31536000
+            }
+            val offset = factor * number.toInt
+            timestampFormat.print(dateTime.plusSeconds(offset))
+    }
+}
+
+def getNewUserBlocksAndBlockExpiration(
+    logParams: String, timestamp: String
+): (Seq[String], String) = {
+    try {
+        val paramsMap = unserialize(logParams).asInstanceOf[Map[String, Any]]
+        (
+            csvToSeq(paramsMap("6::flags").asInstanceOf[String]),
+            applyDurationToTimestamp(timestamp, 
paramsMap("5::duration").asInstanceOf[String])
+        )
+    } catch { case _: Throwable =>
+        try {
+            if (!logParams.contains("\n")) { throw new Exception }
+            val splitParams = logParams.split("\n")
+            if (splitParams.size == 1) {
+                (Seq.empty, applyDurationToTimestamp(timestamp, 
splitParams(0)))
+            } else if (splitParams.size == 2) {
+                (csvToSeq(splitParams(1)), applyDurationToTimestamp(timestamp, 
splitParams(0)))
+            } else { throw new Exception }
+        } catch { case _: Throwable =>
+            throw new Exception("Could not parse blocks from: " + logParams)
+        }
+    }
+}
+
+def parseUserLog(log: Row): UserEvent = {
+    val logType = log.getString(0)
+    val logTimestamp = log.getString(1)
+    val logUser = log.getLong(2)
+    val logTitle = log.getString(3)
+    val logComment = log.getString(4)
+    val logParams = log.getString(5)
+    val logUserText = log.getString(6)
+    
+    val eventType = logType match{
+        case "renameuser" => "rename"
+        case "rights" => "altergroups"
+        case "block" => "alterblocks"
+    }
+    
+    val (oldUserName, newUserName) = eventType match{
+        case "rename" =>
+            val userNames = getOldAndNewUserNames(logParams, logComment, 
logTitle)
+            (Some(userNames._1), Some(userNames._2))
+        case _ =>
+            val userName = logTitle.replaceAll("_", " ")
+            (Some(userName), Some(userName))
+    }
+    
+    val (oldUserGroups, newUserGroups) = eventType match {
+        case "altergroups" => getOldAndNewUserGroups(logParams)
+        case _ => (Seq.empty, Seq.empty)
+    }
+    
+    val (newUserBlocks, blockExpiration) = eventType match{
+        case "alterblocks" =>
+            val blockInfo = getNewUserBlocksAndBlockExpiration(logParams, 
logTimestamp)
+            (blockInfo._1, Some(blockInfo._2))
+        case _ => (Seq.empty, None)
+    }
+    
+    val causedByUserName = logUserText match{
+        case null => None
+        case "" => None
+        case validUserName => Some(validUserName)
+    }
+
+    new UserEvent(
+        timestamp = Some(logTimestamp),
+        eventType = Some(eventType),
+        causedByUserId = Some(logUser),
+        causedByUserName = causedByUserName,
+        oldUserName = oldUserName,
+        newUserName = newUserName,
+        oldUserGroups = oldUserGroups,
+        newUserGroups = newUserGroups,
+        newUserBlocks = newUserBlocks,
+        blockExpiration = blockExpiration
+    )
+}
+
+def getLatestEvents(events: RDD[UserEvent]): (RDD[UserEvent], RDD[UserEvent]) 
= {
+    val markedEvents = events.keyBy(_.newUserName).groupByKey.flatMap{
+        case (newUserName, events) =>
+            val sortedEvents = events.toList.sortWith(_.timestamp.get > 
_.timestamp.get)
+            (sortedEvents.head, true) :: sortedEvents.tail.map((_, false))
+    }
+    val latestEvents = markedEvents.filter(_._2).map(_._1)
+    val remainingEvents = markedEvents.filter(!_._2).map(_._1)
+    (latestEvents, remainingEvents)
+}
+
+def executeOneStep(
+    events: RDD[UserEvent],
+    partialStates: RDD[UserState],
+    finalStates: RDD[UserState]
+): (
+    RDD[UserEvent],
+    RDD[UserState],
+    RDD[UserState]
+) = {
+
+    val (latestEvents, remainingEvents) = getLatestEvents(events)
+    
+    val joinedEvents = latestEvents.keyBy(_.newUserName).fullOuterJoin(
+        partialStates.keyBy(_.userName)
+    ).map(_._2)
+
+    val unpairedEvents = joinedEvents.flatMap{
+        case (Some(rawChange), None) => Seq(rawChange)
+        case _ => Seq()
+    }
+    
+    val newPartialStates = joinedEvents.flatMap{
+        case (Some(event), Some(partialState)) => Seq(
+                partialState.copy(
+                    userName = event.oldUserName,
+                    endTimestamp = event.timestamp
+                )
+            )
+        case _ => Seq()
+    }
+    
+    val newFinalStates = joinedEvents.flatMap{
+        case (Some(event), Some(partialState)) => Seq(
+                partialState.copy(
+                    startTimestamp = event.timestamp,
+                    userGroups = event.newUserGroups,
+                    userBlocks = event.newUserBlocks,
+                    causedByEventType = event.eventType,
+                    causedByUserId = event.causedByUserId,
+                    causedByUserName = event.causedByUserName,
+                    causedByBlockExpiration = event.blockExpiration
+                )
+            )
+        case (None, Some(partialState)) => Seq(
+                partialState.copy(causedByEventType = Some("create"))
+            )
+        case _ => Seq()
+    }
+    
+    (
+        remainingEvents.union(unpairedEvents),
+        newPartialStates,
+        newFinalStates.union(finalStates)
+    )
+}
+
+val partitionNumber = 8
+def fixedPoint(
+    events: RDD[UserEvent],
+    partialStates: RDD[UserState],
+    finalStates: RDD[UserState]
+): (
+    RDD[UserEvent],
+    RDD[UserState],
+    RDD[UserState]
+) = {
+    val repartitionedEvents = events.repartition(partitionNumber)
+    val repartitionedPartialStates = partialStates.repartition(partitionNumber)
+    val repartitionedFinalStates = finalStates.repartition(partitionNumber)
+    
+    val (
+        updatedEvents,
+        updatedPartialStates,
+        updatedFinalStates
+    ) = executeOneStep(
+        repartitionedEvents,
+        repartitionedPartialStates,
+        repartitionedFinalStates
+    )
+    
+    updatedEvents.cache()
+    updatedPartialStates.cache()
+    updatedFinalStates.cache()
+    
+    val updatedPartialStatesCount = updatedPartialStates.count
+    
+    events.unpersist()
+    partialStates.unpersist()
+    finalStates.unpersist()
+    
+    // TODO: Find a better stop condition that is generic and cuts off the 
long tail.
+    if (updatedPartialStatesCount < 2) {
+        (updatedEvents, updatedPartialStates, updatedFinalStates)
+    } else {
+        fixedPoint(
+            updatedEvents, updatedPartialStates, updatedFinalStates
+        )
+    }
+}
+
+def propagateUserGroups(states: List[UserState]): List[UserState] = {
+    states.tail.foldLeft(
+        (List(states.head), Seq.empty[String])
+    ) {
+        case ((processed, userGroups), state) =>
+            if (state.causedByEventType.get == "altergroups") {
+                (processed :+ state, state.userGroups)
+            } else {
+                (processed :+ state.copy(userGroups = userGroups), userGroups)
+            }
+    }._1
+}
+
+def propagateUserBlocks(states: List[UserState]): List[UserState] = {
+    states.tail.foldLeft(
+        (List(states.head), Seq.empty[String], 
None.asInstanceOf[Option[String]])
+    ) {
+        case ((processed, userBlocks, blockExpiration), state) =>
+            if (state.causedByEventType.get == "alterblocks") {
+                (processed :+ state, state.userBlocks, 
state.causedByBlockExpiration)
+            } else if (
+                blockExpiration == None ||
+                blockExpiration.get == "indefinite" ||
+                state.endTimestamp != None && blockExpiration.get > 
state.endTimestamp.get
+            ) {(
+                processed :+ state.copy(userBlocks = userBlocks),
+                userBlocks,
+                blockExpiration
+            )} else {(
+                processed :+ state.copy(
+                    endTimestamp = blockExpiration,
+                    userBlocks = userBlocks
+                ) :+ state.copy(
+                    startTimestamp = blockExpiration,
+                    userBlocks = Seq.empty,
+                    causedByEventType = Some("alterblocks"),
+                    causedByUserId = None,
+                    causedByUserName = None,
+                    causedByBlockExpiration = None
+                ),
+                Seq.empty,
+                None
+            )}
+    }._1
+}
+
+val swlogdf = 
sqlContext.read.avro("/user/milimetric/mediawiki/simplewiki/logging")
+swlogdf.registerTempTable("swlog")
+sqlContext.table("swlog").cache()
+
+val swuserdf = 
sqlContext.read.avro("/user/milimetric/mediawiki/simplewiki/user")
+swuserdf.registerTempTable("swuser")
+sqlContext.table("swuser").cache()
+
+val ipPattern = """^.*[0-9]{1,3}(\.[0-9]{1,3}){3}.*$""".r
+val events = sqlContext.sql("""
+    SELECT
+        cast(log_type as string),
+        cast(log_timestamp as string),
+        log_user,
+        cast(log_title as string),
+        cast(log_comment as string),
+        cast(log_params as string),
+        cast(log_user_text as string)
+    FROM swlog
+    WHERE
+        cast(log_type as string) IN (
+            'renameuser',
+            'rights',
+            'block'
+        )
+""").rdd.filter(
+    log => log.getString(3) match {
+        case ipPattern(_) => false
+        case _ => true
+    }
+).map(parseUserLog(_))
+
+val partialStates = sqlContext.sql("""
+    SELECT
+        user_id,
+        cast(user_name as string),
+        cast(user_registration as string)
+    FROM swuser
+""").rdd.map(user =>
+    new UserState(
+        userId = Some(user.getLong(0)),
+        userName = Some(user.getString(1)),
+        userRegistration = Some(user.getString(2))
+    )
+)
+
+val (
+    unchainedEvents,
+    interruptedPartialStates,
+    finalStates
+) = fixedPoint(
+    events,
+    partialStates,
+    sc.emptyRDD.asInstanceOf[RDD[UserState]]
+)
+
+// Group states by userId, sort them and apply finishing transformations:
+val userStatesRdd = finalStates.keyBy(_.userId).groupByKey.flatMap{
+    case (userId, states) =>
+    
+        // Sort states by startTimestamp ascending,
+        // and sort states with startTimestamp=None first
+        // they were caused by creation events.
+        val sortedStates = states.toList.sortWith{
+            case (a, b) =>
+                a.startTimestamp == None || (
+                    b.startTimestamp != None &&
+                    a.startTimestamp.get < b.startTimestamp.get
+                )
+        }
+        
+        // Propagate user groups and user blocks from left to right.
+        propagateUserGroups(propagateUserBlocks(sortedStates))
+}.map(_.toRow)
+
+// Output the result into the table.
+val schema = sqlContext.table("mforns.simplewiki_user_history").schema
+val userStatesDataframe = sqlContext.createDataFrame(userStatesRdd, schema)
+userStatesDataframe.registerTempTable("userHistory")
+userStatesDataframe.repartition(8).write.mode(SaveMode.Overwrite).insertInto(
+    "mforns.simplewiki_user_history"
+)

-- 
To view, visit https://gerrit.wikimedia.org/r/297268
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Idd5cb2600636486356ba9f23e909b32fad677e1a
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to