Joal has uploaded a new change for review. https://gerrit.wikimedia.org/r/212541
Change subject: Add pageview aggregation and parquet merger. ...................................................................... Add pageview aggregation and parquet merger. Pageview aggregation is done hourly on dimensions project, access_method, agent_type and country. Parquet merger is a utility tool to merge parquet folders having the same schema. Change-Id: I03ea5a5d663e0605225d5d4754beb316f0466104 --- M pom.xml M refinery-job/pom.xml A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata A refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala 12 files changed, 360 insertions(+), 7 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/41/212541/1 diff --git a/pom.xml b/pom.xml index cb169e9..fbbb0a0 100644 --- a/pom.xml +++ b/pom.xml @@ -163,6 +163,13 @@ <version>2.5.2</version> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.10</artifactId> + <version>2.2.4</version> + <scope>test</scope> + </dependency> + </dependencies> </dependencyManagement> diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml index 61808ff..e37726c 100644 --- a/refinery-job/pom.xml +++ b/refinery-job/pom.xml @@ -21,13 +21,6 @@ </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> @@ -48,10 +41,48 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.10</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.github.scopt</groupId> + <artifactId>scopt_2.10</artifactId> + <version>3.3.0</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> <plugins> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>WDF TestSuite.txt</filereports> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala new file mode 100644 index 0000000..b1a9387 --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewAggregator.scala @@ -0,0 +1,142 @@ +package org.wikimedia.analytics.refinery.job + + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.wikimedia.analytics.refinery.core.PageviewDefinition + +/** + * Pageview hourly aggregation job. + * Using text and mobile webrequests, filters pageviews + * and count by project, access_method, agent_type, + * country and country_code. + */ +object PageviewAggregator { + + /* + * Storage object for command line options + */ + class Config(val year: Int = 0, + val month: Int = 0, + val day: Int = 0, + val hour: Int = 0, + val numPartitions: Int = 1, + val hdfsURI: String = "hdfs://analytics-hadoop", + val webrequestBaseFolder: String = "/wmf/data/wmf/webrequest/", + val pageviewBaseFolder: String = "/wmf/data/wmf/pageview/hourly", + val recompute: Boolean = false + ) { + def copy(year: Int = this.year, + month: Int = this.month, + day: Int = this.day, + hour: Int = this.hour, + numPartitions: Int = this.numPartitions, + hdfsURI: String = this.hdfsURI, + webrequestBaseFolder: String = this.webrequestBaseFolder, + pageviewBaseFolder: String = this.pageviewBaseFolder, + recompute: Boolean = this.recompute + ): Config = new Config(year, month, day, hour, numPartitions, hdfsURI, + webrequestBaseFolder, pageviewBaseFolder, recompute) + } + + /* + * Command line options parser, storing values in a config object. + */ + val argsParser = new scopt.OptionParser[Config]("PageviewAggregator") { + head("PageviewAggregator", "") + note("Computes pageview aggregation by dimensions. Dimensions are: " + + "project, access_method, agent_type, country and country_code.") + + opt[Int]('y', "year") required() action { (x, c) => + c.copy(year = x)} text ("year as an integer") + opt[Int]('m', "month") required() action { (x, c) => + c.copy(month = x)} text ("month as an integer") + opt[Int]('d', "day") required() action { (x, c) => + c.copy(day = x)} text ("day as an integer)") + opt[Int]('h', "hour") required() action { (x, c) => + c.copy(hour = x)} text ("hour as an integer") + opt[Int]('p', "numPartitions") optional() valueName ("<numPartitions>") action { (x, c) => + c.copy(numPartitions = x)} text ("Optional - Number of partitions to aggregate and save (defaults to 4).") + opt[String]('r', "rootHdfsURI") optional() valueName ("<URI>") action { (x, c) => + c.copy(hdfsURI = x)} text ("Optional - rootHdfsURI to connect to hdfs (defaults to hdfs://analytics-hadoop") + opt[String]('w', "webrequestBaseFolder") optional() valueName ("<path>") action { (x, c) => + c.copy(webrequestBaseFolder = if (x.endsWith("/")) x else x + "/") + } text ("Optional - webrequestBaseFolder where refined webrequests are stored with expected partitions: " + + "webrequest_source,year, month, day, hour. Defaults to /wmf/data/wmf/webrequest/") + opt[String]('o', "pageviewBaseFolder") optional() valueName ("<path>") action { (x, c) => + c.copy(pageviewBaseFolder = if (x.endsWith("/")) x else x + "/") + } text ("Optional - pageviewBaseFolder where pageview aggregation will be saved using partition format: " + + "/year=YYYY/month=M/day=D/hour=H. Defaults to /wmf/data/wmf/pageview/hourly") + opt[Unit]('f', "forceRecompute") optional() action { (_, c) => + c.copy(recompute = true) + } text ("Optional - forceRecompute deletes existing directory and writes newly " + + "computed values instead of not recomputing.") + help("help") text ("prints this usage text") + } + + /* + * Returns a filtered and formated dataframe from a parquet folder path, ready to be aggregated. + */ + def getSingleWebrequestDataframe(sqlContext: SQLContext, path: String) = sqlContext.parquetFile(path) + .filter("is_pageview") + .selectExpr( + "year", "month", "day", "hour", + "get_project(uri_host) as project", + "access_method", + "agent_type", + "geocoded_data['country'] as country", + "geocoded_data['country_code'] as country_code" + ) + + /* + * Aggregation function building an union of prepared dataset and counting rows. + */ + def aggregate(sqlContext: SQLContext, webrequestPathSeq: Seq[String]): DataFrame = { + sqlContext.udf.register("get_project", (uri_host: String) => + PageviewDefinition.getInstance().getProjectFromHost(uri_host)) + + webrequestPathSeq.tail.foldLeft(getSingleWebrequestDataframe(sqlContext, webrequestPathSeq.head))( + (result, path) => result.unionAll(getSingleWebrequestDataframe(sqlContext, path))) + .groupBy("year", "month", "day", "hour", "project", "access_method", "agent_type", "country", "country_code") + .count() + } + + /* + * Entry point for aggregation. + * Parses command line arguments, launches aggregation and saves result. + */ + def main(args: Array[String]): Unit = { + argsParser.parse(args, new Config()) match { + case Some(config) => { + // get spark context, SQLContext, hdfs and set config + val conf = new SparkConf().setAppName("PageviewAggregator") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryoserializer.buffer.mb", "24") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") + val hadoopConf = new org.apache.hadoop.conf.Configuration() + val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(config.hdfsURI), hadoopConf) + + // Defines needed path and fields + val hourPath = "year=%d/month=%d/day=%d/hour=%d".format(config.year, config.month, config.day, config.hour) + val pageviewPath = config.pageviewBaseFolder + hourPath + val webrequestsPathSeq = Seq(config.webrequestBaseFolder + "webrequest_source=text/" + hourPath, + config.webrequestBaseFolder + "webrequest_source=mobile/" + hourPath) + + // Computes only if needed + if (!hdfs.exists(new org.apache.hadoop.fs.Path(pageviewPath)) || config.recompute) { + // In case it exists, delete pageview folder + hdfs.delete(new org.apache.hadoop.fs.Path(pageviewPath), true) + + aggregate(sqlContext, webrequestsPathSeq) + .repartition(config.numPartitions) + .saveAsParquetFile(pageviewPath) + + } + } + case None => System.exit(1) + } + } + +} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala new file mode 100644 index 0000000..f232dab --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/ParquetMerger.scala @@ -0,0 +1,109 @@ +package org.wikimedia.analytics.refinery.job + +import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, RemoteIterator} +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} + +import scala.annotation.tailrec + +/** + * Job allowing to merge parquet folders having the schema + * Usefull to reduce number of small files while working + * hourly aggregations. + */ +object ParquetMerger { + + /* + * Storage object for command line options + */ + class Config(val outputPath: String = "", + val pathsToMerge: List[String] = List.empty[String], + val numPartitions: Int = 1, + val hdfsURI: String = "hdfs://analytics-hadoop" + ) { + def copy(outputPath: String = this.outputPath, + pathsToMerge: List[String] = this.pathsToMerge, + numPartitions: Int = this.numPartitions, + hdfsURI: String = this.hdfsURI + ): Config = new Config(outputPath, pathsToMerge, numPartitions, hdfsURI) + } + + /* + * Command line options parser, storing values in a config object. + */ + val argsParser = new scopt.OptionParser[Config]("ParquetMerger") { + head("ParquetMerger", "") + note( + """Merges files from various (sub)folders into one single parquet folder. + |Parquet schemas MUST BE THE SAME !""".stripMargin) + + opt[String]('o', "outputPath") required() valueName("<path>") action { (x, c) => + c.copy(outputPath=x) } text("Path were to store the merged files.") + opt[String]('m', "pathsToMerge") required() unbounded() valueName("<path> ...") action { (x, c) => + c.copy(pathsToMerge= c.pathsToMerge :+ x) } text("Paths to merge, recursively looking for file level folders.") + opt[Int]('p', "numPartitions") optional() valueName("<numPartitions>") action { (x, c) => + c.copy(numPartitions=x) } text("Number of partitions to aggregate and save (defaults to 1).") + opt[String]('r', "rootHdfsURI") optional() valueName("<URI>") action { (x, c) => + c.copy(hdfsURI=x) } text("rootHdfsURI to connect to hdfs (defaults to hdfs://analytics-hadoop)") + help("help") text("prints this usage text") + } + + /* + * Function looking for _SUCCESS file in deepest subfolders of a given path list. + * If the file exists, the subfolder is added to the result. + */ + def getPathsToMerge(hdfs: FileSystem, pathsToScan: List[String]): List[String] = + pathsToScan.foldLeft(List.empty[String])( + (pathsToMerge: List[String], pathToScan: String) => + pathsToMerge ++ getDeepestFolders(hdfs.listFiles(new org.apache.hadoop.fs.Path(pathToScan), true), List.empty[String])) + + @tailrec private def getDeepestFolders(iterator: RemoteIterator[LocatedFileStatus], foldersList: List[String]):List[String] = { + if (iterator.hasNext) { + val f = iterator.next() + if (f.isFile && f.getPath.getName.equals("_SUCCESS")) + getDeepestFolders(iterator, foldersList :+ f.getPath.getParent.toString) + else + getDeepestFolders(iterator, foldersList) + } else + foldersList + } + + /* + * Function merging parquet files read from the path list. + */ + def mergeParquetPaths(sqlContext: SQLContext, pathsToMerge: List[String]): DataFrame = { + pathsToMerge.tail.foldLeft(sqlContext.parquetFile(pathsToMerge.head))( + (dataframe: DataFrame, folder: String) => dataframe.unionAll(sqlContext.parquetFile(folder))) + } + + /* + * Entry point for merging parquet files. + * Parses command line arguments, merges sub-path and saves result. + */ + def main(args: Array[String]): Unit = { + argsParser.parse(args, new Config()) match { + case Some(config) => { + // get spark context, SQLContext, hdfs and set config + val conf = new SparkConf().setAppName("ParquetMerger") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryoserializer.buffer.mb", "24") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") + val hadoopConf = new org.apache.hadoop.conf.Configuration() + val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(config.hdfsURI), hadoopConf) + + val pathsToMerge = getPathsToMerge(hdfs, config.pathsToMerge) + + val mergedParquet = mergeParquetPaths(sqlContext, pathsToMerge) + + mergedParquet + .repartition(config.numPartitions) + .saveAsParquetFile(config.outputPath) + + } + case None => System.exit(1) + } + } + +} diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata new file mode 100644 index 0000000..22fc171 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_common_metadata Binary files differ diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata new file mode 100644 index 0000000..06db993 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/_metadata Binary files differ diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet new file mode 100644 index 0000000..7780357 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=mobile/year=2015/month=5/day=18/hour=5/part-r-00001.parquet Binary files differ diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata new file mode 100644 index 0000000..22fc171 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_common_metadata Binary files differ diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata new file mode 100644 index 0000000..06db993 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/_metadata Binary files differ diff --git a/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet new file mode 100644 index 0000000..7780357 --- /dev/null +++ b/refinery-job/src/test/resources/webrequest_test_data/webrequest_source=text/year=2015/month=5/day=18/hour=5/part-r-00001.parquet Binary files differ diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala new file mode 100644 index 0000000..10fa83f --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/SparkJobSpec.scala @@ -0,0 +1,33 @@ +package org.wikimedia.analytics.refinery.job + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FlatSpec} + +abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterAll{ + + @transient var sc: SparkContext = _ + @transient var sqlContext: SQLContext = _ + + override def beforeAll = { + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + sc = new SparkContext(conf) + sqlContext = new SQLContext(sc) + } + + override def afterAll = { + if (sc != null) { + sc.stop() + sc = null + sqlContext = null + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + } + } + +} diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala new file mode 100644 index 0000000..d0a38ec --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestPageviewAggregator.scala @@ -0,0 +1,31 @@ +package org.wikimedia.analytics.refinery.job + +import org.scalatest.Matchers + +class TestPageviewAggregator extends SparkJobSpec with Matchers { + + "PageviewAggregator" should "Aggregate pageviews" in { + + val year = 2015 + val month = 5 + val day = 18 + val hour = 5 + val webrequestBaseFolder = getClass.getResource("/webrequest_test_data").getPath + "/" + + val hourPath = "year=%d/month=%d/day=%d/hour=%d".format(year, month, day, hour) + val webrequestPathSeq = Seq(webrequestBaseFolder + "webrequest_source=mobile/" + hourPath, + webrequestBaseFolder + "webrequest_source=text/" + hourPath) + + val result = PageviewAggregator.aggregate(sqlContext, webrequestPathSeq) + + result.count() shouldEqual 137 + + val top = result.sort(result("count").desc).limit(1).collect()(0) + top(4) shouldEqual "en.wikipedia" // project + top(5) shouldEqual "desktop" // access_method + top(6) shouldEqual "user" // agent_type + top(8) shouldEqual "US" // country_code + top(9) shouldEqual 592 // count + + } +} -- To view, visit https://gerrit.wikimedia.org/r/212541 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I03ea5a5d663e0605225d5d4754beb316f0466104 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits