Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/212541

Change subject: Add pageview aggregation and parquet merger.
......................................................................

Add pageview aggregation and parquet merger.

Pageview aggregation is done hourly on dimensions project, access_method, 
agent_type and country.
Parquet merger is a utility tool to merge parquet folders having the same 
schema.

Change-Id: I03ea5a5d663e0605225d5d4754beb316f0466104
---
M pom.xml
M refinery-job/pom.xml
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata
A 
refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala
12 files changed, 360 insertions(+), 7 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/41/212541/1

diff --git a/pom.xml b/pom.xml
index cb169e9..fbbb0a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,6 +163,13 @@
         <version>2.5.2</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest_2.10</artifactId>
+        <version>2.2.4</version>
+        <scope>test</scope>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 61808ff..e37726c 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -21,13 +21,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
             <version>${spark.version}</version>
@@ -48,10 +41,48 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.10</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.scopt</groupId>
+            <artifactId>scopt_2.10</artifactId>
+            <version>3.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
         <plugins>
+
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>1.0</version>
+                <configuration>
+                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>WDF TestSuite.txt</filereports>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.scala-tools</groupId>
                 <artifactId>maven-scala-plugin</artifactId>
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala
new file mode 100644
index 0000000..b1a9387
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala
@@ -0,0 +1,142 @@
+package org.wikimedia.analytics.refinery.job
+
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.wikimedia.analytics.refinery.core.PageviewDefinition
+
+/**
+ * Pageview hourly aggregation job.
+ * Using text and mobile webrequests, filters pageviews
+ * and count by project, access_method, agent_type,
+ * country and country_code.
+ */
+object PageviewAggregator {
+
+  /*
+   * Storage object for command line options
+   */
+  class Config(val year: Int = 0,
+               val month: Int = 0,
+               val day: Int = 0,
+               val hour: Int = 0,
+               val numPartitions: Int = 1,
+               val hdfsURI: String = "hdfs://analytics-hadoop",
+               val webrequestBaseFolder: String = "/wmf/data/wmf/webrequest/",
+               val pageviewBaseFolder: String = 
"/wmf/data/wmf/pageview/hourly",
+               val recompute: Boolean = false
+                ) {
+    def copy(year: Int = this.year,
+             month: Int = this.month,
+             day: Int = this.day,
+             hour: Int = this.hour,
+             numPartitions: Int = this.numPartitions,
+             hdfsURI: String = this.hdfsURI,
+             webrequestBaseFolder: String = this.webrequestBaseFolder,
+             pageviewBaseFolder: String = this.pageviewBaseFolder,
+             recompute: Boolean = this.recompute
+              ): Config = new Config(year, month, day, hour, numPartitions, 
hdfsURI,
+      webrequestBaseFolder, pageviewBaseFolder, recompute)
+  }
+
+  /*
+   * Command line options parser, storing values in a config object.
+   */
+  val argsParser = new scopt.OptionParser[Config]("PageviewAggregator") {
+    head("PageviewAggregator", "")
+    note("Computes pageview aggregation by dimensions. Dimensions are: " +
+         "project, access_method, agent_type, country and country_code.")
+
+    opt[Int]('y', "year") required() action { (x, c) =>
+      c.copy(year = x)} text ("year as an integer")
+    opt[Int]('m', "month") required() action { (x, c) =>
+      c.copy(month = x)} text ("month as an integer")
+    opt[Int]('d', "day") required() action { (x, c) =>
+      c.copy(day = x)} text ("day as an integer)")
+    opt[Int]('h', "hour") required() action { (x, c) =>
+      c.copy(hour = x)} text ("hour as an integer")
+    opt[Int]('p', "numPartitions") optional() valueName ("<numPartitions>") 
action { (x, c) =>
+      c.copy(numPartitions = x)} text ("Optional - Number of partitions to 
aggregate and save (defaults to 4).")
+    opt[String]('r', "rootHdfsURI") optional() valueName ("<URI>") action { 
(x, c) =>
+      c.copy(hdfsURI = x)} text ("Optional - rootHdfsURI to connect to hdfs 
(defaults to hdfs://analytics-hadoop")
+    opt[String]('w', "webrequestBaseFolder") optional() valueName ("<path>") 
action { (x, c) =>
+      c.copy(webrequestBaseFolder = if (x.endsWith("/")) x else x + "/")
+    } text ("Optional - webrequestBaseFolder where refined webrequests are 
stored with expected partitions: " +
+            "webrequest_source,year, month, day, hour. Defaults to 
/wmf/data/wmf/webrequest/")
+    opt[String]('o', "pageviewBaseFolder") optional() valueName ("<path>") 
action { (x, c) =>
+      c.copy(pageviewBaseFolder = if (x.endsWith("/")) x else x + "/")
+    } text ("Optional - pageviewBaseFolder where pageview aggregation will be 
saved using partition format: " +
+            "/year=YYYY/month=M/day=D/hour=H. Defaults to 
/wmf/data/wmf/pageview/hourly")
+    opt[Unit]('f', "forceRecompute") optional() action { (_, c) =>
+      c.copy(recompute = true)
+    } text ("Optional - forceRecompute deletes existing directory and writes 
newly " +
+            "computed values instead of not recomputing.")
+    help("help") text ("prints this usage text")
+  }
+
+  /*
+   * Returns a filtered and formated dataframe from a parquet folder path, 
ready to be aggregated.
+   */
+  def getSingleWebrequestDataframe(sqlContext: SQLContext, path: String) = 
sqlContext.parquetFile(path)
+    .filter("is_pageview")
+    .selectExpr(
+      "year", "month", "day", "hour",
+      "get_project(uri_host) as project",
+      "access_method",
+      "agent_type",
+      "geocoded_data['country'] as country",
+      "geocoded_data['country_code'] as country_code"
+    )
+
+  /*
+   * Aggregation function building an union of prepared dataset and counting 
rows.
+   */
+  def aggregate(sqlContext: SQLContext, webrequestPathSeq: Seq[String]): 
DataFrame = {
+    sqlContext.udf.register("get_project", (uri_host: String) =>
+      PageviewDefinition.getInstance().getProjectFromHost(uri_host))
+
+    webrequestPathSeq.tail.foldLeft(getSingleWebrequestDataframe(sqlContext, 
webrequestPathSeq.head))(
+      (result, path) => 
result.unionAll(getSingleWebrequestDataframe(sqlContext, path)))
+      .groupBy("year", "month", "day", "hour", "project", "access_method", 
"agent_type", "country", "country_code")
+      .count()
+  }
+
+  /*
+   * Entry point for aggregation.
+   * Parses command line arguments, launches aggregation and saves result.
+   */
+  def main(args: Array[String]): Unit = {
+    argsParser.parse(args, new Config()) match {
+      case Some(config) => {
+        // get spark context, SQLContext, hdfs and set config
+        val conf = new SparkConf().setAppName("PageviewAggregator")
+          .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+          .set("spark.kryoserializer.buffer.mb", "24")
+        val sc = new SparkContext(conf)
+        val sqlContext = new SQLContext(sc)
+        sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
+        val hadoopConf = new org.apache.hadoop.conf.Configuration()
+        val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI(config.hdfsURI), hadoopConf)
+
+        // Defines needed path and fields
+        val hourPath = "year=%d/month=%d/day=%d/hour=%d".format(config.year, 
config.month, config.day, config.hour)
+        val pageviewPath = config.pageviewBaseFolder + hourPath
+        val webrequestsPathSeq = Seq(config.webrequestBaseFolder + 
"webrequest_source=text/" + hourPath,
+          config.webrequestBaseFolder + "webrequest_source=mobile/" + hourPath)
+
+        // Computes only if needed
+        if (!hdfs.exists(new org.apache.hadoop.fs.Path(pageviewPath)) || 
config.recompute) {
+          // In case it exists, delete pageview folder
+          hdfs.delete(new org.apache.hadoop.fs.Path(pageviewPath), true)
+
+          aggregate(sqlContext, webrequestsPathSeq)
+            .repartition(config.numPartitions)
+            .saveAsParquetFile(pageviewPath)
+
+        }
+      }
+      case None => System.exit(1)
+    }
+  }
+
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala
new file mode 100644
index 0000000..f232dab
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala
@@ -0,0 +1,109 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, RemoteIterator}
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.annotation.tailrec
+
+/**
+ * Job allowing to merge parquet folders having the schema
+ * Usefull to reduce number of small files while working
+ * hourly aggregations.
+ */
+object ParquetMerger {
+
+  /*
+   * Storage object for command line options
+   */
+  class Config(val outputPath: String = "",
+               val pathsToMerge: List[String] = List.empty[String],
+               val numPartitions: Int = 1,
+               val hdfsURI: String = "hdfs://analytics-hadoop"
+                ) {
+    def copy(outputPath: String = this.outputPath,
+             pathsToMerge: List[String] = this.pathsToMerge,
+             numPartitions: Int = this.numPartitions,
+             hdfsURI: String = this.hdfsURI
+              ): Config = new Config(outputPath, pathsToMerge, numPartitions, 
hdfsURI)
+  }
+
+  /*
+   * Command line options parser, storing values in a config object.
+   */
+  val argsParser = new scopt.OptionParser[Config]("ParquetMerger") {
+    head("ParquetMerger", "")
+    note(
+      """Merges files from various (sub)folders into one single parquet folder.
+        |Parquet schemas MUST BE THE SAME !""".stripMargin)
+
+    opt[String]('o', "outputPath") required() valueName("<path>") action { (x, 
c) =>
+      c.copy(outputPath=x) } text("Path were to store the merged files.")
+    opt[String]('m', "pathsToMerge") required() unbounded() valueName("<path> 
...") action { (x, c) =>
+     c.copy(pathsToMerge= c.pathsToMerge :+ x) } text("Paths to merge, 
recursively looking for file level folders.")
+    opt[Int]('p', "numPartitions") optional() valueName("<numPartitions>") 
action { (x, c) =>
+      c.copy(numPartitions=x) } text("Number of partitions to aggregate and 
save (defaults to 1).")
+    opt[String]('r', "rootHdfsURI") optional() valueName("<URI>") action { (x, 
c) =>
+      c.copy(hdfsURI=x) } text("rootHdfsURI to connect to hdfs (defaults to 
hdfs://analytics-hadoop)")
+    help("help") text("prints this usage text")
+  }
+
+  /*
+   * Function looking for _SUCCESS file in deepest subfolders of a given path 
list.
+   * If the file exists, the subfolder is added to the result.
+   */
+  def getPathsToMerge(hdfs: FileSystem, pathsToScan: List[String]): 
List[String] =
+    pathsToScan.foldLeft(List.empty[String])(
+      (pathsToMerge: List[String], pathToScan: String) =>
+        pathsToMerge ++ getDeepestFolders(hdfs.listFiles(new 
org.apache.hadoop.fs.Path(pathToScan), true), List.empty[String]))
+
+  @tailrec private def getDeepestFolders(iterator: 
RemoteIterator[LocatedFileStatus], foldersList: List[String]):List[String] = {
+    if (iterator.hasNext) {
+      val f = iterator.next()
+      if (f.isFile && f.getPath.getName.equals("_SUCCESS"))
+        getDeepestFolders(iterator, foldersList :+ 
f.getPath.getParent.toString)
+      else
+        getDeepestFolders(iterator, foldersList)
+    } else
+      foldersList
+  }
+
+  /*
+   * Function merging parquet files read from the path list.
+   */
+  def mergeParquetPaths(sqlContext: SQLContext, pathsToMerge: List[String]): 
DataFrame = {
+    pathsToMerge.tail.foldLeft(sqlContext.parquetFile(pathsToMerge.head))(
+      (dataframe: DataFrame, folder: String) => 
dataframe.unionAll(sqlContext.parquetFile(folder)))
+  }
+
+  /*
+   * Entry point for merging parquet files.
+   * Parses command line arguments, merges sub-path and saves result.
+   */
+  def main(args: Array[String]): Unit = {
+    argsParser.parse(args, new Config()) match {
+      case Some(config) => {
+        // get spark context, SQLContext, hdfs and set config
+        val conf = new SparkConf().setAppName("ParquetMerger")
+          .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+          .set("spark.kryoserializer.buffer.mb", "24")
+        val sc = new SparkContext(conf)
+        val sqlContext = new SQLContext(sc)
+        sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
+        val hadoopConf = new org.apache.hadoop.conf.Configuration()
+        val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI(config.hdfsURI), hadoopConf)
+
+        val pathsToMerge = getPathsToMerge(hdfs, config.pathsToMerge)
+
+        val mergedParquet = mergeParquetPaths(sqlContext, pathsToMerge)
+
+        mergedParquet
+          .repartition(config.numPartitions)
+          .saveAsParquetFile(config.outputPath)
+
+      }
+      case None => System.exit(1)
+    }
+  }
+
+}
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata
new file mode 100644
index 0000000..22fc171
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata
Binary files differ
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata
new file mode 100644
index 0000000..06db993
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata
Binary files differ
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
new file mode 100644
index 0000000..7780357
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
Binary files differ
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata
new file mode 100644
index 0000000..22fc171
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata
Binary files differ
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata
new file mode 100644
index 0000000..06db993
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata
Binary files differ
diff --git 
a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
new file mode 100644
index 0000000..7780357
--- /dev/null
+++ 
b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet
Binary files differ
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala
new file mode 100644
index 0000000..10fa83f
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala
@@ -0,0 +1,33 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+
+abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterAll{
+
+  @transient var sc: SparkContext = _
+  @transient var sqlContext: SQLContext = _
+
+  override def beforeAll = {
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+    sc = new SparkContext(conf)
+    sqlContext = new SQLContext(sc)
+  }
+
+  override def afterAll = {
+    if (sc != null) {
+      sc.stop()
+      sc = null
+      sqlContext = null
+      System.clearProperty("spark.driver.port")
+      System.clearProperty("spark.hostPort")
+    }
+  }
+
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala
new file mode 100644
index 0000000..d0a38ec
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala
@@ -0,0 +1,31 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.scalatest.Matchers
+
+class TestPageviewAggregator extends SparkJobSpec with Matchers {
+
+  "PageviewAggregator" should "Aggregate pageviews" in {
+
+    val year = 2015
+    val month = 5
+    val day = 18
+    val hour = 5
+    val webrequestBaseFolder = 
getClass.getResource("/webrequest_test_data").getPath + "/"
+
+    val hourPath = "year=%d/month=%d/day=%d/hour=%d".format(year, month, day, 
hour)
+    val webrequestPathSeq = Seq(webrequestBaseFolder + 
"webrequest_source=mobile/" + hourPath,
+                                webrequestBaseFolder + 
"webrequest_source=text/"   + hourPath)
+
+    val result = PageviewAggregator.aggregate(sqlContext, webrequestPathSeq)
+
+    result.count() shouldEqual 137
+
+    val top = result.sort(result("count").desc).limit(1).collect()(0)
+    top(4) shouldEqual "en.wikipedia" // project
+    top(5) shouldEqual "desktop"      // access_method
+    top(6) shouldEqual "user"         // agent_type
+    top(8) shouldEqual "US"           // country_code
+    top(9) shouldEqual 592            // count
+
+  }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/212541
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I03ea5a5d663e0605225d5d4754beb316f0466104
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to