I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language: - Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate 15590657,246620,20150630 14066921,1907,20150621 14066921,1906,20150626 6522013,2349,20150626 6522013,2525,20150613 - Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate) - For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device. - Pick the last 30 days of bytes from this ordered set. - Find the moving average of bytes for the last date using a time period of 30. - Find the standard deviation of the bytes for the final date using a time period of 30. - Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3] I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.Here is the data structure for the dataset.package com.testing case class DeviceAggregates ( device_id: Integer, bytes: Long, eventdate: Integer ) extends Ordered[DailyDeviceAggregates] { def compare(that: DailyDeviceAggregates): Int = { eventdate - that.eventdate } } object DeviceAggregates { def parseLogLine(logline: String): DailyDeviceAggregates = { val c = logline.split(",") DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) } }The DeviceAnalyzer class looks like this:I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now: import com.testing.DailyDeviceAggregates import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.linalg.{Vector, Vectors} import scala.util.Sorting object DeviceAnalyzer { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Device Analyzer") val sc = new SparkContext(sparkConf) val logFile = args(0) val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() // Calculate statistics based on bytes val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) deviceIdsMap.foreach(a => { val device_id = a._1 // This is the device ID val allaggregates = a._2 // This is an array of all device-aggregates for this device println(allaggregates) Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate println(allaggregates) // This does not work - results are not sorted !! val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray val count = byteValues.count(A => true) val sum = byteValues.sum val xbar = sum / count val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum val stddev = math.sqrt(sum_x_minus_x_bar_square / count) val vector: Vector = Vectors.dense(byteValues) println(vector) println(device_id + "," + xbar + "," + stddev) //val vector: Vector = Vectors.dense(byteValues) //println(vector) //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector) }) sc.stop() } }I would really appreciate if someone can suggests improvements for the following: - The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way. - I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values. - For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job. - At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD. Thanks in advance for your help. Anupam BagchiÂ