>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
> wrong way.

allaggregates.toArray allocates and creates a new array separate from
allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)

> I would like to use the Spark mllib class MultivariateStatisticalSummary
> to calculate the statistical values.

MultivariateStatisticalSummary is a trait (similar to a Java interface);
you probably want to use MultivariateOnlineSummarizer.

> For that I would need to keep all my intermediate values as RDD so that I
> can directly use the RDD methods to do the job.

Correct; you would do an aggregate using the add and merge functions
provided by MultivariateOnlineSummarizer

> At the end I also need to write the results to HDFS for which there is a
> method provided on the RDD class to do so, which is another reason I would
> like to retain everything as RDD.

You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
or you could unpack the relevant statistics from
MultivariateOnlineSummarizer into an array/tuple using a mapValues first
and then write.

On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
anupam_bag...@rocketmail.com> wrote:

> I have to do the following tasks on a dataset using Apache Spark with
> Scala as the programming language:
>
>    1. Read the dataset from HDFS. A few sample lines look like this:
>
>  
> deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>
>
>    1. Group the data by device id. Thus we now have a map of deviceid =>
>    (bytes,eventdate)
>    2. For each device, sort the set by eventdate. We now have an ordered
>    set of bytes based on eventdate for each device.
>    3. Pick the last 30 days of bytes from this ordered set.
>    4. Find the moving average of bytes for the last date using a time
>    period of 30.
>    5. Find the standard deviation of the bytes for the final date using a
>    time period of 30.
>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>    [Assume k = 3]
>
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
> run on a billion rows finally.
> Here is the data structure for the dataset.
>
> package com.testingcase class DeviceAggregates (
>                         device_id: Integer,
>                         bytes: Long,
>                         eventdate: Integer
>                    ) extends Ordered[DailyDeviceAggregates] {
>   def compare(that: DailyDeviceAggregates): Int = {
>     eventdate - that.eventdate
>   }}object DeviceAggregates {
>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>     val c = logline.split(",")
>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>   }}
>
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to
> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
> basic. Here is what I have now:
>
> import com.testing.DailyDeviceAggregatesimport 
> org.apache.spark.{SparkContext, SparkConf}import 
> org.apache.spark.mllib.linalg.Vectorimport 
> org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
> Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
> import scala.util.Sorting
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
>
>     val logFile = args(0)
>
>     val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates 
> for this device
>
>       println(allaggregates)
>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
> DailyDeviceAggregates based on eventdate
>       println(allaggregates) // This does not work - results are not sorted !!
>
>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => 
> (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
>
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = 
> Statistics.colStats(vector)
>     })
>
>     sc.stop()
>   }}
>
> I would really appreciate if someone can suggests improvements for the
> following:
>
>    1. The call to Sorting.quicksort is not working. Perhaps I am calling
>    it the wrong way.
>    2. I would like to use the Spark mllib class
>    MultivariateStatisticalSummary to calculate the statistical values.
>    3. For that I would need to keep all my intermediate values as RDD so
>    that I can directly use the RDD methods to do the job.
>    4. At the end I also need to write the results to HDFS for which there
>    is a method provided on the RDD class to do so, which is another reason I
>    would like to retain everything as RDD.
>
>
> Thanks in advance for your help.
>
> Anupam Bagchi
>
>

Reply via email to