I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
Read the dataset from HDFS. A few sample lines look like this: deviceid,bytes,eventdate 15590657,246620,20150630 14066921,1907,20150621 14066921,1906,20150626 6522013,2349,20150626 6522013,2525,20150613 Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate) For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device. Pick the last 30 days of bytes from this ordered set. Find the moving average of bytes for the last date using a time period of 30. Find the standard deviation of the bytes for the final date using a time period of 30. Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3] I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally. Here is the data structure for the dataset. package com.testing case class DeviceAggregates ( device_id: Integer, bytes: Long, eventdate: Integer ) extends Ordered[DailyDeviceAggregates] { def compare(that: DailyDeviceAggregates): Int = { eventdate - that.eventdate } } object DeviceAggregates { def parseLogLine(logline: String): DailyDeviceAggregates = { val c = logline.split(",") DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) } } The DeviceAnalyzer class looks like this: I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now: import com.testing.DailyDeviceAggregates import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.linalg.{Vector, Vectors} import scala.util.Sorting object DeviceAnalyzer { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Device Analyzer") val sc = new SparkContext(sparkConf) val logFile = args(0) val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() // Calculate statistics based on bytes val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) deviceIdsMap.foreach(a => { val device_id = a._1 // This is the device ID val allaggregates = a._2 // This is an array of all device-aggregates for this device println(allaggregates) Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate println(allaggregates) // This does not work - results are not sorted !! val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray val count = byteValues.count(A => true) val sum = byteValues.sum val xbar = sum / count val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum val stddev = math.sqrt(sum_x_minus_x_bar_square / count) val vector: Vector = Vectors.dense(byteValues) println(vector) println(device_id + "," + xbar + "," + stddev) //val vector: Vector = Vectors.dense(byteValues) //println(vector) //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector) }) sc.stop() } } I would really appreciate if someone can suggests improvements for the following: The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way. I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values. For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job. At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD. Thanks in advance for your help. Anupam Bagchi