> > The call to Sorting.quicksort is not working. Perhaps I am calling it the > wrong way.
allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try: val sortedAggregates = allaggregates.toArray Sorting.quickSort(sortedAggregates) > I would like to use the Spark mllib class MultivariateStatisticalSummary > to calculate the statistical values. MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. > For that I would need to keep all my intermediate values as RDD so that I > can directly use the RDD methods to do the job. Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer > At the end I also need to write the results to HDFS for which there is a > method provided on the RDD class to do so, which is another reason I would > like to retain everything as RDD. You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write. On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi < anupam_bag...@rocketmail.com> wrote: > I have to do the following tasks on a dataset using Apache Spark with > Scala as the programming language: > > 1. Read the dataset from HDFS. A few sample lines look like this: > > > deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613 > > > 1. Group the data by device id. Thus we now have a map of deviceid => > (bytes,eventdate) > 2. For each device, sort the set by eventdate. We now have an ordered > set of bytes based on eventdate for each device. > 3. Pick the last 30 days of bytes from this ordered set. > 4. Find the moving average of bytes for the last date using a time > period of 30. > 5. Find the standard deviation of the bytes for the final date using a > time period of 30. > 6. Return two values in the result (mean - k*stddev) and (mean + k*stddev) > [Assume k = 3] > > I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to > run on a billion rows finally. > Here is the data structure for the dataset. > > package com.testingcase class DeviceAggregates ( > device_id: Integer, > bytes: Long, > eventdate: Integer > ) extends Ordered[DailyDeviceAggregates] { > def compare(that: DailyDeviceAggregates): Int = { > eventdate - that.eventdate > }}object DeviceAggregates { > def parseLogLine(logline: String): DailyDeviceAggregates = { > val c = logline.split(",") > DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) > }} > > The DeviceAnalyzer class looks like this: > I have a very crude implementation that does the job, but it is not up to > the mark. Sorry, I am very new to Scala/Spark, so my questions are quite > basic. Here is what I have now: > > import com.testing.DailyDeviceAggregatesimport > org.apache.spark.{SparkContext, SparkConf}import > org.apache.spark.mllib.linalg.Vectorimport > org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, > Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors} > import scala.util.Sorting > object DeviceAnalyzer { > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("Device Analyzer") > val sc = new SparkContext(sparkConf) > > val logFile = args(0) > > val deviceAggregateLogs = > sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() > > // Calculate statistics based on bytes > val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) > > deviceIdsMap.foreach(a => { > val device_id = a._1 // This is the device ID > val allaggregates = a._2 // This is an array of all device-aggregates > for this device > > println(allaggregates) > Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of > DailyDeviceAggregates based on eventdate > println(allaggregates) // This does not work - results are not sorted !! > > val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray > val count = byteValues.count(A => true) > val sum = byteValues.sum > val xbar = sum / count > val sum_x_minus_x_bar_square = byteValues.map(x => > (x-xbar)*(x-xbar)).sum > val stddev = math.sqrt(sum_x_minus_x_bar_square / count) > > val vector: Vector = Vectors.dense(byteValues) > println(vector) > println(device_id + "," + xbar + "," + stddev) > > //val vector: Vector = Vectors.dense(byteValues) > //println(vector) > //val summary: MultivariateStatisticalSummary = > Statistics.colStats(vector) > }) > > sc.stop() > }} > > I would really appreciate if someone can suggests improvements for the > following: > > 1. The call to Sorting.quicksort is not working. Perhaps I am calling > it the wrong way. > 2. I would like to use the Spark mllib class > MultivariateStatisticalSummary to calculate the statistical values. > 3. For that I would need to keep all my intermediate values as RDD so > that I can directly use the RDD methods to do the job. > 4. At the end I also need to write the results to HDFS for which there > is a method provided on the RDD class to do so, which is another reason I > would like to retain everything as RDD. > > > Thanks in advance for your help. > > Anupam Bagchi > >