Mforns has uploaded a new change for review. https://gerrit.wikimedia.org/r/297268
Change subject: [WIP] Process MediaWiki User history ...................................................................... [WIP] Process MediaWiki User history Bug: T138861 Change-Id: Idd5cb2600636486356ba9f23e909b32fad677e1a --- A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala 1 file changed, 458 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/68/297268/1 diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala new file mode 100644 index 0000000..16d9288 --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/MediawikiToUserHistory.scala @@ -0,0 +1,458 @@ +/* create table statement for the user_history table +CREATE EXTERNAL TABLE IF NOT EXISTS `mforns.simplewiki_user_history`( + `user_id` bigint, + `user_name` string, + `user_groups` array<string>, + `user_blocks` array<string>, + `user_registration` string, + `start_timestamp` string, + `end_timestamp` string, + `caused_by_event_type` string, + `caused_by_user_id` bigint, + `caused_by_user_name` string +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' +LOCATION '/user/mforns/mediawiki/simplewiki/user_history'; +*/ + +import com.databricks.spark.avro._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.SaveMode +import scala.util.matching.Regex +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat + +case class UserEvent( + timestamp: Option[String] = None, + eventType: Option[String] = None, + causedByUserId: Option[Long] = None, + causedByUserName: Option[String] = None, + oldUserName: Option[String] = None, + newUserName: Option[String] = None, + oldUserGroups: Seq[String] = Seq.empty, + newUserGroups: Seq[String] = Seq.empty, + newUserBlocks: Seq[String] = Seq.empty, + blockExpiration: Option[String] = None +) + +case class UserState( + userId: Option[Long] = None, + userName: Option[String] = None, + userGroups: Seq[String] = Seq.empty, + userBlocks: Seq[String] = Seq.empty, + userRegistration: Option[String] = None, + startTimestamp: Option[String] = None, + endTimestamp: Option[String] = None, + causedByEventType: Option[String] = None, + causedByUserId: Option[Long] = None, + causedByUserName: Option[String] = None, + causedByBlockExpiration: Option[String] = None +) { + def toRow: Row = Row( + userId.getOrElse(null), + userName.getOrElse(null), + userGroups, + userBlocks, + userRegistration.getOrElse(null), + startTimestamp.getOrElse(userRegistration.getOrElse(null)), + endTimestamp.getOrElse(null), + causedByEventType.getOrElse(null), + causedByUserId.getOrElse(null), + causedByUserName.getOrElse(null) + ) +} + +val userNamesPattern = """^[^\[]*\[\[[^:]*:([^|]*)\|.*\]\][^\[]*\[\[[^:]*:(.*)\|.*$""".r +def getOldAndNewUserNames( + logParams: String, logComment: String, logTitle: String +): (String, String) = { + try { + val logParamsMap = unserialize(logParams).asInstanceOf[Map[String, Any]] + ( + logParamsMap("4::olduser").asInstanceOf[String], + logParamsMap("5::newuser").asInstanceOf[String] + ) + } catch { case _: Throwable => + try { + logComment match { + case userNamesPattern(oldName, newName) => (oldName, newName) + } + } catch { case _: Throwable => + (logTitle.replaceAll("_", " "), logParams) + } + } +} + +def csvToSeq(csv: String): Seq[String] = { + if (csv == "") Seq.empty else csv.trim.split(",").map(_.trim) +} + +def getOldAndNewUserGroups(logParams: String): (Seq[String], Seq[String]) = { + try { + val paramsMap = unserialize(logParams).asInstanceOf[Map[String, Any]] + def paramToSeq(param: String): Seq[String] = { + paramsMap(param).asInstanceOf[Map[String, String]].values.toList + } + (paramToSeq("4::oldgroups"), paramToSeq("5::newgroups")) + } catch { case _: Throwable => + try { + if (!logParams.contains("\n")) { throw new Exception } + val splitParams = logParams.split("\n") + if (splitParams.size == 1) { + (csvToSeq(splitParams(0)), Seq.empty) + } else if (splitParams.size == 2) { + (csvToSeq(splitParams(0)), csvToSeq(splitParams(1))) + } else { throw new Exception } + } catch { case _: Throwable => + if (logParams.startsWith("=")) { + (Seq.empty, logParams.replaceAll("=", "").split(",").map(_.trim)) + } else if (logParams.contains("+")) { + (Seq.empty, logParams.split(" ").map(_.replace("+", "").trim)) + } else { + throw new Exception("Could not parse groups from: " + logParams) + } + } + } +} + +val durationPattern = """^([0-9]+)\s([a-z]+?)s?$""".r +def applyDurationToTimestamp(timestamp: String, duration: String): String = { + duration.trim match{ + case "indefinite" | "infinite" | "never" => "indefinite" + case durationPattern(number, period) => + val timestampFormat = DateTimeFormat.forPattern("yyyyMMddHHmmss") + val dateTime = DateTime.parse(timestamp, timestampFormat) + val factor = period match { + case "second" => 1 + case "minute" => 60 + case "hour" => 3600 + case "day" => 86400 + case "week" => 604800 + case "month" => 2592000 + case "year" => 31536000 + } + val offset = factor * number.toInt + timestampFormat.print(dateTime.plusSeconds(offset)) + } +} + +def getNewUserBlocksAndBlockExpiration( + logParams: String, timestamp: String +): (Seq[String], String) = { + try { + val paramsMap = unserialize(logParams).asInstanceOf[Map[String, Any]] + ( + csvToSeq(paramsMap("6::flags").asInstanceOf[String]), + applyDurationToTimestamp(timestamp, paramsMap("5::duration").asInstanceOf[String]) + ) + } catch { case _: Throwable => + try { + if (!logParams.contains("\n")) { throw new Exception } + val splitParams = logParams.split("\n") + if (splitParams.size == 1) { + (Seq.empty, applyDurationToTimestamp(timestamp, splitParams(0))) + } else if (splitParams.size == 2) { + (csvToSeq(splitParams(1)), applyDurationToTimestamp(timestamp, splitParams(0))) + } else { throw new Exception } + } catch { case _: Throwable => + throw new Exception("Could not parse blocks from: " + logParams) + } + } +} + +def parseUserLog(log: Row): UserEvent = { + val logType = log.getString(0) + val logTimestamp = log.getString(1) + val logUser = log.getLong(2) + val logTitle = log.getString(3) + val logComment = log.getString(4) + val logParams = log.getString(5) + val logUserText = log.getString(6) + + val eventType = logType match{ + case "renameuser" => "rename" + case "rights" => "altergroups" + case "block" => "alterblocks" + } + + val (oldUserName, newUserName) = eventType match{ + case "rename" => + val userNames = getOldAndNewUserNames(logParams, logComment, logTitle) + (Some(userNames._1), Some(userNames._2)) + case _ => + val userName = logTitle.replaceAll("_", " ") + (Some(userName), Some(userName)) + } + + val (oldUserGroups, newUserGroups) = eventType match { + case "altergroups" => getOldAndNewUserGroups(logParams) + case _ => (Seq.empty, Seq.empty) + } + + val (newUserBlocks, blockExpiration) = eventType match{ + case "alterblocks" => + val blockInfo = getNewUserBlocksAndBlockExpiration(logParams, logTimestamp) + (blockInfo._1, Some(blockInfo._2)) + case _ => (Seq.empty, None) + } + + val causedByUserName = logUserText match{ + case null => None + case "" => None + case validUserName => Some(validUserName) + } + + new UserEvent( + timestamp = Some(logTimestamp), + eventType = Some(eventType), + causedByUserId = Some(logUser), + causedByUserName = causedByUserName, + oldUserName = oldUserName, + newUserName = newUserName, + oldUserGroups = oldUserGroups, + newUserGroups = newUserGroups, + newUserBlocks = newUserBlocks, + blockExpiration = blockExpiration + ) +} + +def getLatestEvents(events: RDD[UserEvent]): (RDD[UserEvent], RDD[UserEvent]) = { + val markedEvents = events.keyBy(_.newUserName).groupByKey.flatMap{ + case (newUserName, events) => + val sortedEvents = events.toList.sortWith(_.timestamp.get > _.timestamp.get) + (sortedEvents.head, true) :: sortedEvents.tail.map((_, false)) + } + val latestEvents = markedEvents.filter(_._2).map(_._1) + val remainingEvents = markedEvents.filter(!_._2).map(_._1) + (latestEvents, remainingEvents) +} + +def executeOneStep( + events: RDD[UserEvent], + partialStates: RDD[UserState], + finalStates: RDD[UserState] +): ( + RDD[UserEvent], + RDD[UserState], + RDD[UserState] +) = { + + val (latestEvents, remainingEvents) = getLatestEvents(events) + + val joinedEvents = latestEvents.keyBy(_.newUserName).fullOuterJoin( + partialStates.keyBy(_.userName) + ).map(_._2) + + val unpairedEvents = joinedEvents.flatMap{ + case (Some(rawChange), None) => Seq(rawChange) + case _ => Seq() + } + + val newPartialStates = joinedEvents.flatMap{ + case (Some(event), Some(partialState)) => Seq( + partialState.copy( + userName = event.oldUserName, + endTimestamp = event.timestamp + ) + ) + case _ => Seq() + } + + val newFinalStates = joinedEvents.flatMap{ + case (Some(event), Some(partialState)) => Seq( + partialState.copy( + startTimestamp = event.timestamp, + userGroups = event.newUserGroups, + userBlocks = event.newUserBlocks, + causedByEventType = event.eventType, + causedByUserId = event.causedByUserId, + causedByUserName = event.causedByUserName, + causedByBlockExpiration = event.blockExpiration + ) + ) + case (None, Some(partialState)) => Seq( + partialState.copy(causedByEventType = Some("create")) + ) + case _ => Seq() + } + + ( + remainingEvents.union(unpairedEvents), + newPartialStates, + newFinalStates.union(finalStates) + ) +} + +val partitionNumber = 8 +def fixedPoint( + events: RDD[UserEvent], + partialStates: RDD[UserState], + finalStates: RDD[UserState] +): ( + RDD[UserEvent], + RDD[UserState], + RDD[UserState] +) = { + val repartitionedEvents = events.repartition(partitionNumber) + val repartitionedPartialStates = partialStates.repartition(partitionNumber) + val repartitionedFinalStates = finalStates.repartition(partitionNumber) + + val ( + updatedEvents, + updatedPartialStates, + updatedFinalStates + ) = executeOneStep( + repartitionedEvents, + repartitionedPartialStates, + repartitionedFinalStates + ) + + updatedEvents.cache() + updatedPartialStates.cache() + updatedFinalStates.cache() + + val updatedPartialStatesCount = updatedPartialStates.count + + events.unpersist() + partialStates.unpersist() + finalStates.unpersist() + + // TODO: Find a better stop condition that is generic and cuts off the long tail. + if (updatedPartialStatesCount < 2) { + (updatedEvents, updatedPartialStates, updatedFinalStates) + } else { + fixedPoint( + updatedEvents, updatedPartialStates, updatedFinalStates + ) + } +} + +def propagateUserGroups(states: List[UserState]): List[UserState] = { + states.tail.foldLeft( + (List(states.head), Seq.empty[String]) + ) { + case ((processed, userGroups), state) => + if (state.causedByEventType.get == "altergroups") { + (processed :+ state, state.userGroups) + } else { + (processed :+ state.copy(userGroups = userGroups), userGroups) + } + }._1 +} + +def propagateUserBlocks(states: List[UserState]): List[UserState] = { + states.tail.foldLeft( + (List(states.head), Seq.empty[String], None.asInstanceOf[Option[String]]) + ) { + case ((processed, userBlocks, blockExpiration), state) => + if (state.causedByEventType.get == "alterblocks") { + (processed :+ state, state.userBlocks, state.causedByBlockExpiration) + } else if ( + blockExpiration == None || + blockExpiration.get == "indefinite" || + state.endTimestamp != None && blockExpiration.get > state.endTimestamp.get + ) {( + processed :+ state.copy(userBlocks = userBlocks), + userBlocks, + blockExpiration + )} else {( + processed :+ state.copy( + endTimestamp = blockExpiration, + userBlocks = userBlocks + ) :+ state.copy( + startTimestamp = blockExpiration, + userBlocks = Seq.empty, + causedByEventType = Some("alterblocks"), + causedByUserId = None, + causedByUserName = None, + causedByBlockExpiration = None + ), + Seq.empty, + None + )} + }._1 +} + +val swlogdf = sqlContext.read.avro("/user/milimetric/mediawiki/simplewiki/logging") +swlogdf.registerTempTable("swlog") +sqlContext.table("swlog").cache() + +val swuserdf = sqlContext.read.avro("/user/milimetric/mediawiki/simplewiki/user") +swuserdf.registerTempTable("swuser") +sqlContext.table("swuser").cache() + +val ipPattern = """^.*[0-9]{1,3}(\.[0-9]{1,3}){3}.*$""".r +val events = sqlContext.sql(""" + SELECT + cast(log_type as string), + cast(log_timestamp as string), + log_user, + cast(log_title as string), + cast(log_comment as string), + cast(log_params as string), + cast(log_user_text as string) + FROM swlog + WHERE + cast(log_type as string) IN ( + 'renameuser', + 'rights', + 'block' + ) +""").rdd.filter( + log => log.getString(3) match { + case ipPattern(_) => false + case _ => true + } +).map(parseUserLog(_)) + +val partialStates = sqlContext.sql(""" + SELECT + user_id, + cast(user_name as string), + cast(user_registration as string) + FROM swuser +""").rdd.map(user => + new UserState( + userId = Some(user.getLong(0)), + userName = Some(user.getString(1)), + userRegistration = Some(user.getString(2)) + ) +) + +val ( + unchainedEvents, + interruptedPartialStates, + finalStates +) = fixedPoint( + events, + partialStates, + sc.emptyRDD.asInstanceOf[RDD[UserState]] +) + +// Group states by userId, sort them and apply finishing transformations: +val userStatesRdd = finalStates.keyBy(_.userId).groupByKey.flatMap{ + case (userId, states) => + + // Sort states by startTimestamp ascending, + // and sort states with startTimestamp=None first + // they were caused by creation events. + val sortedStates = states.toList.sortWith{ + case (a, b) => + a.startTimestamp == None || ( + b.startTimestamp != None && + a.startTimestamp.get < b.startTimestamp.get + ) + } + + // Propagate user groups and user blocks from left to right. + propagateUserGroups(propagateUserBlocks(sortedStates)) +}.map(_.toRow) + +// Output the result into the table. +val schema = sqlContext.table("mforns.simplewiki_user_history").schema +val userStatesDataframe = sqlContext.createDataFrame(userStatesRdd, schema) +userStatesDataframe.registerTempTable("userHistory") +userStatesDataframe.repartition(8).write.mode(SaveMode.Overwrite).insertInto( + "mforns.simplewiki_user_history" +) -- To view, visit https://gerrit.wikimedia.org/r/297268 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Idd5cb2600636486356ba9f23e909b32fad677e1a Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Mforns <mfo...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits