Thanks Feynman for your direction. I was able to solve this problem by calling Spark API from Java.
Here is a code snippet that may help other people who might face the same challenge. if (args.length > 2) { earliestEventDate = Integer.parseInt(args[2]); } else { Date now = Calendar.getInstance().getTime(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd"); earliestEventDate = Integer.parseInt(dateFormat.format(new Date(now.getTime()-30L*AnalyticsConstants.ONE_DAY_IN_MILLISECONDS))); } System.out.println("Filtering out dates earlier than: " + earliestEventDate); JavaRDD<String> logLines = sc.textFile(inputFile); // Convert the text log lines to DailyDeviceAggregates objects and cache them JavaRDD<DailyDeviceAggregates> accessLogs = logLines.map(Functions.PARSE_DEVICE_AGGREGATE_LINE).filter(new Function<DailyDeviceAggregates, Boolean>() { @Override public Boolean call(DailyDeviceAggregates value) { return (value.getEventdate() >= earliestEventDate); } }).cache(); // accessLogs.saveAsTextFile("accessLogs.saved"); JavaPairRDD<Object, Iterable<DailyDeviceAggregates>> groupMap = accessLogs.groupBy(new Function<DailyDeviceAggregates, Object>() { @Override public Object call(DailyDeviceAggregates agg) throws Exception { return agg.getDevice_id(); } }); // groupMap.saveAsTextFile("groupedAccessLogs.saved"); JavaPairRDD<Object, DailyDeviceSummary> deviceCharacteristics = groupMap.mapValues(new Function<Iterable<DailyDeviceAggregates>, DailyDeviceSummary>() { @Override public DailyDeviceSummary call(Iterable<DailyDeviceAggregates> allDeviceDataForMonth) throws Exception { // First task is to sort the input values by eventdate ArrayList<DailyDeviceAggregates> arr = new ArrayList<DailyDeviceAggregates>(); for (DailyDeviceAggregates agg: allDeviceDataForMonth) { arr.add(agg); } Collections.sort(arr); // Done sorting double bytesTransferred[] = new double[arr.size()]; double bytesIn[] = new double[arr.size()]; double bytesOut[] = new double[arr.size()]; DailyDeviceAggregates lastAggregate = null; int index = 0; for (DailyDeviceAggregates aggregate : arr) { // System.out.println(aggregate); bytesIn[index] = aggregate.getBytes_in(); bytesOut[index] = aggregate.getBytes_out(); bytesTransferred[index] = aggregate.getBytes_transferred(); index++; lastAggregate = aggregate; } BollingerBands bollingerBytesTransferrred = new BollingerBands(bytesTransferred, 30, 2.0); BollingerBands bollingerBytesIn = new BollingerBands(bytesIn, 30, 2.0); BollingerBands bollingerBytesOut = new BollingerBands(bytesOut, 30, 2.0); return new DailyDeviceSummary(lastAggregate.getAccount_id(), lastAggregate.getDevice_id(), index, bollingerBytesIn.getLastMean(), bollingerBytesOut.getLastMean(), bollingerBytesTransferrred.getLastMean(), bollingerBytesIn.getLastStandardDeviation(), bollingerBytesOut.getLastStandardDeviation(), bollingerBytesTransferrred.getLastStandardDeviation(), (long)bollingerBytesIn.getLastUpperThreshold(), (long)bollingerBytesOut.getLastUpperThreshold(), (long)bollingerBytesTransferrred.getLastUpperThreshold(), (long)bollingerBytesIn.getLastLowerThreshold(), (long)bollingerBytesOut.getLastLowerThreshold(), (long)bollingerBytesTransferrred.getLastLowerThreshold()); } }); deviceCharacteristics.values().saveAsTextFile(outputFile); Anupam Bagchi > On Jul 14, 2015, at 10:21 AM, Feynman Liang <fli...@databricks.com> wrote: > > If your rows may have NAs in them, I would process each column individually > by first projecting the column ( map(x => x.nameOfColumn) ), filtering out > the NAs, then running a summarizer over each column. > > Even if you have many rows, after summarizing you will only have a vector of > length #columns. > > On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <anupam_bag...@rocketmail.com > <mailto:anupam_bag...@rocketmail.com>> wrote: > Hello Feynman, > > Actually in my case, the vectors I am summarizing over will not have the same > dimension since many devices will be inactive on some days. This is at best a > sparse matrix where we take only the active days and attempt to fit a moving > average over it. > > The reason I would like to save it to HDFS is that there are really several > million (almost a billion) devices for which this data needs to be written. I > am perhaps writing a very few columns, but the number of rows is pretty large. > > Given the above two cases, is using MultivariateOnlineSummarizer not a good > idea then? > > Anupam Bagchi > > >> On Jul 13, 2015, at 7:06 PM, Feynman Liang <fli...@databricks.com >> <mailto:fli...@databricks.com>> wrote: >> >> Dimensions mismatch when adding new sample. Expecting 8 but got 14. >> >> Make sure all the vectors you are summarizing over have the same dimension. >> >> Why would you want to write a MultivariateOnlineSummary object (which can be >> represented with a couple Double's) into a distributed filesystem like HDFS? >> >> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bag...@rocketmail.com >> <mailto:anupam_bag...@rocketmail.com>> wrote: >> Thank you Feynman for the lead. >> >> I was able to modify the code using clues from the RegressionMetrics >> example. Here is what I got now. >> >> val deviceAggregateLogs = >> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >> >> // Calculate statistics based on bytes-transferred >> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >> println(deviceIdsMap.collect().deep.mkString("\n")) >> >> val summary: MultivariateStatisticalSummary = { >> val summary: MultivariateStatisticalSummary = deviceIdsMap.map { >> case (deviceId, allaggregates) => Vectors.dense({ >> val sortedAggregates = allaggregates.toArray >> Sorting.quickSort(sortedAggregates) >> sortedAggregates.map(dda => dda.bytes.toDouble) >> }) >> }.aggregate(new MultivariateOnlineSummarizer())( >> (summary, v) => summary.add(v), // Not sure if this is really what I >> want, it just came from the example >> (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well >> ) >> summary >> } >> It compiles fine. But I am now getting an exception as follows at Runtime. >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted due >> to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: >> Lost task 1.0 in stage 3.0 (TID 5, localhost): >> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch >> when adding new sample. Expecting 8 but got 14. >> at scala.Predef$.require(Predef.scala:233) >> at >> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70) >> at >> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41) >> at >> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41) >> at >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) >> at >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) >> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) >> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) >> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966) >> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966) >> at >> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533) >> at >> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:722) >> >> Can’t tell where exactly I went wrong. Also, how do I take the >> MultivariateOnlineSummary object and write it to HDFS? I have the >> MultivariateOnlineSummary object with me, but I really need an RDD to call >> saveAsTextFile() on it. >> >> Anupam Bagchi >> >> >>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fli...@databricks.com >>> <mailto:fli...@databricks.com>> wrote: >>> >>> A good example is RegressionMetrics >>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s >>> use of of OnlineMultivariateSummarizer to aggregate statistics across >>> labels and residuals; take a look at how aggregateByKey is used there. >>> >>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi >>> <anupam_bag...@rocketmail.com <mailto:anupam_bag...@rocketmail.com>> wrote: >>> Thank you Feynman for your response. Since I am very new to Scala I may >>> need a bit more hand-holding at this stage. >>> >>> I have been able to incorporate your suggestion about sorting - and it now >>> works perfectly. Thanks again for that. >>> >>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but >>> could not proceed further. For each deviceid (the key) my goal is to get a >>> vector of doubles on which I can query the mean and standard deviation. Now >>> because RDDs are immutable, I cannot use a foreach loop to interate through >>> the groupby results and individually add the values in an RDD - Spark does >>> not allow that. I need to apply the RDD functions directly on the entire >>> set to achieve the transformations I need. This is where I am faltering >>> since I am not used to the lambda expressions that Scala uses. >>> >>> object DeviceAnalyzer { >>> def main(args: Array[String]) { >>> val sparkConf = new SparkConf().setAppName("Device Analyzer") >>> val sc = new SparkContext(sparkConf) >>> >>> val logFile = args(0) >>> >>> val deviceAggregateLogs = >>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >>> >>> // Calculate statistics based on bytes >>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >>> // Question: Can we not write the line above as >>> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // >>> Anything wrong? >>> // All I need to do below is collect the vector of bytes for each >>> device and store it in the RDD >>> // The problem with the ‘foreach' approach below, is that it generates >>> the vector values one at a time, which I cannot >>> // add individually to an immutable RDD >>> deviceIdsMap.foreach(a => { >>> val device_id = a._1 // This is the device ID >>> val allaggregates = a._2 // This is an array of all >>> device-aggregates for this device >>> >>> val sortedaggregates = allaggregates.toArray >>> Sorting.quickSort(sortedaggregates) >>> >>> val byteValues = sortedaggregates.map(dda => >>> dda.bytes.toDouble).toArray >>> val count = byteValues.count(A => true) >>> val sum = byteValues.sum >>> val xbar = sum / count >>> val sum_x_minus_x_bar_square = byteValues.map(x => >>> (x-xbar)*(x-xbar)).sum >>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count) >>> >>> val vector: Vector = Vectors.dense(byteValues) >>> println(vector) >>> println(device_id + "," + xbar + "," + stddev) >>> >>> }) >>> //val vector: Vector = Vectors.dense(byteValues) >>> //println(vector) >>> //val summary: MultivariateStatisticalSummary = >>> Statistics.colStats(vector) >>> >>> >>> sc.stop() >>> } >>> } >>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? >>> Thanks a lot for your help. >>> >>> Anupam Bagchi >>> >>> >>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com >>>> <mailto:fli...@databricks.com>> wrote: >>>> >>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the >>>> wrong way. >>>> allaggregates.toArray allocates and creates a new array separate from >>>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try: >>>> val sortedAggregates = allaggregates.toArray >>>> Sorting.quickSort(sortedAggregates) >>>> I would like to use the Spark mllib class MultivariateStatisticalSummary >>>> to calculate the statistical values. >>>> MultivariateStatisticalSummary is a trait (similar to a Java interface); >>>> you probably want to use MultivariateOnlineSummarizer. >>>> For that I would need to keep all my intermediate values as RDD so that I >>>> can directly use the RDD methods to do the job. >>>> Correct; you would do an aggregate using the add and merge functions >>>> provided by MultivariateOnlineSummarizer >>>> At the end I also need to write the results to HDFS for which there is a >>>> method provided on the RDD class to do so, which is another reason I would >>>> like to retain everything as RDD. >>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, >>>> or you could unpack the relevant statistics from >>>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first >>>> and then write. >>>> >>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi >>>> <anupam_bag...@rocketmail.com <mailto:anupam_bag...@rocketmail.com>> wrote: >>>> I have to do the following tasks on a dataset using Apache Spark with >>>> Scala as the programming language: >>>> Read the dataset from HDFS. A few sample lines look like this: >>>> deviceid,bytes,eventdate >>>> 15590657,246620,20150630 >>>> 14066921,1907,20150621 >>>> 14066921,1906,20150626 >>>> 6522013,2349,20150626 >>>> 6522013,2525,20150613 >>>> Group the data by device id. Thus we now have a map of deviceid => >>>> (bytes,eventdate) >>>> For each device, sort the set by eventdate. We now have an ordered set of >>>> bytes based on eventdate for each device. >>>> Pick the last 30 days of bytes from this ordered set. >>>> Find the moving average of bytes for the last date using a time period of >>>> 30. >>>> Find the standard deviation of the bytes for the final date using a time >>>> period of 30. >>>> Return two values in the result (mean - kstddev) and (mean + kstddev) >>>> [Assume k = 3] >>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to >>>> run on a billion rows finally. >>>> Here is the data structure for the dataset. >>>> package com.testing >>>> case class DeviceAggregates ( >>>> device_id: Integer, >>>> bytes: Long, >>>> eventdate: Integer >>>> ) extends Ordered[DailyDeviceAggregates] { >>>> def compare(that: DailyDeviceAggregates): Int = { >>>> eventdate - that.eventdate >>>> } >>>> } >>>> object DeviceAggregates { >>>> def parseLogLine(logline: String): DailyDeviceAggregates = { >>>> val c = logline.split(",") >>>> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) >>>> } >>>> } >>>> The DeviceAnalyzer class looks like this: >>>> I have a very crude implementation that does the job, but it is not up to >>>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite >>>> basic. Here is what I have now: >>>> >>>> import com.testing.DailyDeviceAggregates >>>> import org.apache.spark.{SparkContext, SparkConf} >>>> import org.apache.spark.mllib.linalg.Vector >>>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, >>>> Statistics} >>>> import org.apache.spark.mllib.linalg.{Vector, Vectors} >>>> >>>> import scala.util.Sorting >>>> >>>> object DeviceAnalyzer { >>>> def main(args: Array[String]) { >>>> val sparkConf = new SparkConf().setAppName("Device Analyzer") >>>> val sc = new SparkContext(sparkConf) >>>> >>>> val logFile = args(0) >>>> >>>> val deviceAggregateLogs = >>>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >>>> >>>> // Calculate statistics based on bytes >>>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >>>> >>>> deviceIdsMap.foreach(a => { >>>> val device_id = a._1 // This is the device ID >>>> val allaggregates = a._2 // This is an array of all >>>> device-aggregates for this device >>>> >>>> println(allaggregates) >>>> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer >>>> of DailyDeviceAggregates based on eventdate >>>> println(allaggregates) // This does not work - results are not >>>> sorted !! >>>> >>>> val byteValues = allaggregates.map(dda => >>>> dda.bytes.toDouble).toArray >>>> val count = byteValues.count(A => true) >>>> val sum = byteValues.sum >>>> val xbar = sum / count >>>> val sum_x_minus_x_bar_square = byteValues.map(x => >>>> (x-xbar)*(x-xbar)).sum >>>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count) >>>> >>>> val vector: Vector = Vectors.dense(byteValues) >>>> println(vector) >>>> println(device_id + "," + xbar + "," + stddev) >>>> >>>> //val vector: Vector = Vectors.dense(byteValues) >>>> //println(vector) >>>> //val summary: MultivariateStatisticalSummary = >>>> Statistics.colStats(vector) >>>> }) >>>> >>>> sc.stop() >>>> } >>>> } >>>> I would really appreciate if someone can suggests improvements for the >>>> following: >>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the >>>> wrong way. >>>> I would like to use the Spark mllib class MultivariateStatisticalSummary >>>> to calculate the statistical values. >>>> For that I would need to keep all my intermediate values as RDD so that I >>>> can directly use the RDD methods to do the job. >>>> At the end I also need to write the results to HDFS for which there is a >>>> method provided on the RDD class to do so, which is another reason I would >>>> like to retain everything as RDD. >>>> >>>> Thanks in advance for your help. >>>> >>>> Anupam Bagchi >>>> >>>> >>> >>> >> >> > >