I have to do the following tasks on a dataset using Apache Spark with Scala as 
the programming language:

Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid => 
(bytes,eventdate)

For each device, sort the set by eventdate. We now have an ordered set of bytes 
based on eventdate for each device.

Pick the last 30 days of bytes from this ordered set.

Find the moving average of bytes for the last date using a time period of 30.

Find the standard deviation of the bytes for the final date using a time period 
of 30.

Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k 
= 3]

I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
on a billion rows finally.

Here is the data structure for the dataset.

package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}
The DeviceAnalyzer class looks like this:

I have a very crude implementation that does the job, but it is not up to the 
mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. 
Here is what I have now:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

      println(allaggregates)
      Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
DailyDeviceAggregates based on eventdate
      println(allaggregates) // This does not work - results are not sorted !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)
    })

    sc.stop()
  }
}
I would really appreciate if someone can suggests improvements for the 
following:

The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong 
way.
I would like to use the Spark mllib class MultivariateStatisticalSummary to 
calculate the statistical values.
For that I would need to keep all my intermediate values as RDD so that I can 
directly use the RDD methods to do the job.
At the end I also need to write the results to HDFS for which there is a method 
provided on the RDD class to do so, which is another reason I would like to 
retain everything as RDD.

Thanks in advance for your help.

Anupam Bagchi

Reply via email to