A good example is RegressionMetrics <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s use of of OnlineMultivariateSummarizer to aggregate statistics across labels and residuals; take a look at how aggregateByKey is used there.
On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bag...@rocketmail.com > wrote: > Thank you Feynman for your response. Since I am very new to Scala I may > need a bit more hand-holding at this stage. > > I have been able to incorporate your suggestion about sorting - and it now > works perfectly. Thanks again for that. > > I tried to use your suggestion of using MultiVariateOnlineSummarizer, but > could not proceed further. For each deviceid (the key) my goal is to get a > vector of doubles on which I can query the mean and standard deviation. Now > because RDDs are immutable, I cannot use a foreach loop to interate through > the groupby results and individually add the values in an RDD - Spark does > not allow that. I need to apply the RDD functions directly on the entire > set to achieve the transformations I need. This is where I am faltering > since I am not used to the lambda expressions that Scala uses. > > object DeviceAnalyzer { > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("Device Analyzer") > val sc = new SparkContext(sparkConf) > > val logFile = args(0) > > val deviceAggregateLogs = > sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() > > // Calculate statistics based on bytes > val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) > > // Question: Can we not write the line above as > deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything > wrong? > > // All I need to do below is collect the vector of bytes for each device > and store it in the RDD > > // The problem with the ‘foreach' approach below, is that it generates > the vector values one at a time, which I cannot > > // add individually to an immutable RDD > > deviceIdsMap.foreach(a => { > val device_id = a._1 // This is the device ID > val allaggregates = a._2 // This is an array of all device-aggregates > for this device > > val sortedaggregates = allaggregates.toArray > Sorting.quickSort(sortedaggregates) > > val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray > val count = byteValues.count(A => true) > val sum = byteValues.sum > val xbar = sum / count > val sum_x_minus_x_bar_square = byteValues.map(x => > (x-xbar)*(x-xbar)).sum > val stddev = math.sqrt(sum_x_minus_x_bar_square / count) > > val vector: Vector = Vectors.dense(byteValues) > println(vector) > println(device_id + "," + xbar + "," + stddev) > }) > > //val vector: Vector = Vectors.dense(byteValues) > //println(vector) > //val summary: MultivariateStatisticalSummary = > Statistics.colStats(vector) > > > sc.stop() } } > > Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? > Thanks a lot for your help. > > Anupam Bagchi > > > On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com> wrote: > > The call to Sorting.quicksort is not working. Perhaps I am calling it the >> wrong way. > > allaggregates.toArray allocates and creates a new array separate from > allaggregates which is sorted by Sorting.quickSort; allaggregates. Try: > val sortedAggregates = allaggregates.toArray > Sorting.quickSort(sortedAggregates) > >> I would like to use the Spark mllib class MultivariateStatisticalSummary >> to calculate the statistical values. > > MultivariateStatisticalSummary is a trait (similar to a Java interface); > you probably want to use MultivariateOnlineSummarizer. > >> For that I would need to keep all my intermediate values as RDD so that I >> can directly use the RDD methods to do the job. > > Correct; you would do an aggregate using the add and merge functions > provided by MultivariateOnlineSummarizer > >> At the end I also need to write the results to HDFS for which there is a >> method provided on the RDD class to do so, which is another reason I would >> like to retain everything as RDD. > > You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, > or you could unpack the relevant statistics from > MultivariateOnlineSummarizer into an array/tuple using a mapValues first > and then write. > > On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi < > anupam_bag...@rocketmail.com> wrote: > >> I have to do the following tasks on a dataset using Apache Spark with >> Scala as the programming language: >> >> 1. Read the dataset from HDFS. A few sample lines look like this: >> >> >> deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613 >> >> >> 1. Group the data by device id. Thus we now have a map of deviceid => >> (bytes,eventdate) >> 2. For each device, sort the set by eventdate. We now have an ordered >> set of bytes based on eventdate for each device. >> 3. Pick the last 30 days of bytes from this ordered set. >> 4. Find the moving average of bytes for the last date using a time >> period of 30. >> 5. Find the standard deviation of the bytes for the final date using >> a time period of 30. >> 6. Return two values in the result (mean - k*stddev) and (mean + k*stddev) >> [Assume k = 3] >> >> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to >> run on a billion rows finally. >> Here is the data structure for the dataset. >> >> package com.testingcase class DeviceAggregates ( >> device_id: Integer, >> bytes: Long, >> eventdate: Integer >> ) extends Ordered[DailyDeviceAggregates] { >> def compare(that: DailyDeviceAggregates): Int = { >> eventdate - that.eventdate >> }}object DeviceAggregates { >> def parseLogLine(logline: String): DailyDeviceAggregates = { >> val c = logline.split(",") >> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) >> }} >> >> The DeviceAnalyzer class looks like this: >> I have a very crude implementation that does the job, but it is not up to >> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite >> basic. Here is what I have now: >> >> import com.testing.DailyDeviceAggregatesimport >> org.apache.spark.{SparkContext, SparkConf}import >> org.apache.spark.mllib.linalg.Vectorimport >> org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, >> Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors} >> import scala.util.Sorting >> object DeviceAnalyzer { >> def main(args: Array[String]) { >> val sparkConf = new SparkConf().setAppName("Device Analyzer") >> val sc = new SparkContext(sparkConf) >> >> val logFile = args(0) >> >> val deviceAggregateLogs = >> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >> >> // Calculate statistics based on bytes >> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >> >> deviceIdsMap.foreach(a => { >> val device_id = a._1 // This is the device ID >> val allaggregates = a._2 // This is an array of all device-aggregates >> for this device >> >> println(allaggregates) >> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of >> DailyDeviceAggregates based on eventdate >> println(allaggregates) // This does not work - results are not sorted >> !! >> >> val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray >> val count = byteValues.count(A => true) >> val sum = byteValues.sum >> val xbar = sum / count >> val sum_x_minus_x_bar_square = byteValues.map(x => >> (x-xbar)*(x-xbar)).sum >> val stddev = math.sqrt(sum_x_minus_x_bar_square / count) >> >> val vector: Vector = Vectors.dense(byteValues) >> println(vector) >> println(device_id + "," + xbar + "," + stddev) >> >> //val vector: Vector = Vectors.dense(byteValues) >> //println(vector) >> //val summary: MultivariateStatisticalSummary = >> Statistics.colStats(vector) >> }) >> >> sc.stop() >> }} >> >> I would really appreciate if someone can suggests improvements for the >> following: >> >> 1. The call to Sorting.quicksort is not working. Perhaps I am calling >> it the wrong way. >> 2. I would like to use the Spark mllib class >> MultivariateStatisticalSummary to calculate the statistical values. >> 3. For that I would need to keep all my intermediate values as RDD so >> that I can directly use the RDD methods to do the job. >> 4. At the end I also need to write the results to HDFS for which >> there is a method provided on the RDD class to do so, which is another >> reason I would like to retain everything as RDD. >> >> >> Thanks in advance for your help. >> >> Anupam Bagchi >> >> > > >