I have some DStream in Spark Scala and I want to sort it then take the top N. The problem is that whenever I try to run it I get NotSerializableException and the exception message says:
This is because the DStream object is being referred to from within the closure. The problem is that I don't know how to solve it. My try is attached with the e-mail. I don't mind any other ways to sort a DStream and get its top N rather than my way.
package com.badrit.realtime import java.util.Date import com.badrit.drivers.UnlimitedSpaceTimeDriver import com.badrit.model.{CellBuilder, DataReader, Trip} import com.badrit.utility.Printer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext} import scala.collection.mutable object StreamingDriver { val appName: String = "HotSpotRealTime" val hostName = "localhost" val port = 5050 val constrains = UnlimitedSpaceTimeDriver.constrains; var streamingRate = 1; var windowSize = 8; var slidingInterval = 2; val cellBuilder = new CellBuilder(constrains) val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv" def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = { val sparkCtx = sparkStreamCtx.sparkContext val textFile: RDD[String] = sparkCtx.textFile(inputFilePath) val data: RDD[Trip] = new DataReader().getTrips(textFile) val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0))) .groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect() printf("Grouped Data Count is " + groupedData.length) var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty; groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray)) printf("\n\nTest Queue size is " + dataQueue.size) groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => { println("Items List " + index) val passengers: Array[Int] = trips.map(_.passengers).toArray val cnt = passengers.length println("Sum is " + passengers.sum) println("Cnt is " + cnt) val passengersRdd = sparkCtx.parallelize(passengers) println("Mean " + passengersRdd.mean()) println("Stdv" + passengersRdd.stdev()) } } sparkStreamCtx.queueStream(dataQueue, true) } def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup) def main(args: Array[String]) { if (args.length < 1) { streamingRate = 1; windowSize = 3 //2 hours 60 * 60 * 1000L slidingInterval = 2 //0.5 hour 60 * 60 * 1000L } else { streamingRate = args(0).toInt; windowSize = args(1).toInt slidingInterval = args(2).toInt } val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]") val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate)) sparkStreamCtx.sparkContext.setLogLevel("ERROR") sparkStreamCtx.checkpoint("/tmp") val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx) val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval)) //my main problem lies in the following line val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10))) newDataWindow.print sparkStreamCtx.start() sparkStreamCtx.awaitTerminationOrTimeout(1000) } }
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org