I am pasting the code here . Please let me know if there is any sequence that is wrong.
def createContext(checkpointDirectory: String, config: Config): StreamingContext = { println("Creating new context") val conf = new SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true") val ssc = new StreamingContext(conf, Seconds(config.getInt(batchIntervalParam))) ssc.checkpoint(checkpointDirectory) val isValid = validate(ssc, config) if (isValid) { val result = runJob(ssc, config) println("result is " + result) } else { println(isValid.toString) } ssc } def main(args: Array[String]): Unit = { if (args.length < 1) { println("Must specify the path to config file ") println("Usage progname <path to config file> ") return } val url = args(0) logger.info("Starting " + appName) println("Got the path as %s".format(url)) val source = scala.io.Source.fromFile(url) val lines = try source.mkString finally source.close() val config = ConfigFactory.parseString(lines) val directoryPath = config.getString(checkPointParam) val ssc = StreamingContext.getOrCreate(directoryPath, () => { createContext(directoryPath,config) }) ssc.start() ssc.awaitTermination() } def getRabbitMQStream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = { val rabbitMQHost = config.getString(rabbitmqHostParam) val rabbitMQPort = config.getInt(rabbitmqPortParam) val rabbitMQQueue = config.getString(rabbitmqQueueNameParam) println("changing the memory lvel") val receiverStream: ReceiverInputDStream[String] = { RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost, rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER) } receiverStream.start() receiverStream } def getBaseDstream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = { val baseDstream = config.getString(receiverTypeParam) match { case "rabbitmq" => getRabbitMQStream(config, ssc) } baseDstream } def runJob(ssc: StreamingContext, config: Config): Any = { val keyspace = config.getString(keyspaceParam) val clientStatsTable = config.getString(clientStatsTableParam) val hourlyStatsTable = config.getString(hourlyStatsTableParam) val batchInterval = config.getInt(batchIntervalParam) val windowInterval = config.getInt(windowIntervalParam) val hourlyInterval = config.getInt(hourlyParam) val limit = config.getInt(limitParam) val lines = getBaseDstream(config, ssc) val statsRDD = lines.filter(_.contains("client_stats")).map(_.split(",")(1)) val parserFunc = getProtobufParserFunction() val clientUsageRDD: DStream[((String, String), Double)] = statsRDD.flatMap(x => parserFunc(x)) val formatterFunc = getJsonFormatterFunc() val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(windowInterval), Seconds(batchInterval)) .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2)))) .reduceByKey((x, y) => (x ++ y)) .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit))) println("Client Usage from rabbitmq ") oneMinuteWindowResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable) oneMinuteWindowResult.print() val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval)) .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2)))) .reduceByKey((x, y) => (x ++ y)) .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit))) HourlyResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable) HourlyResult.map(x => (x, "hourly")).print() } } On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran <vimal3...@gmail.com> wrote: > I have a simple spark streaming application which reads the data from the > rabbitMQ > and does some aggregation on window interval of 1 min and 1 hour for > batch interval of 30s. > > I have a three node setup. And to enable checkpoint, > I have mounted the same directory using sshfs to all worker node for > creating checkpoint. > > When I run the spark streaming App for the first time it works fine . > I could see the results being printed on console and some checkpoints > happening in the network directory. > > But when I run the job for the second time , it fails with the following > exception > > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > And the exception is repeated. > > > I am not pumping huge data to the rabbitMQ. When I run the job for the > first time I am dumping only < 100 events . > And when I run for the second time, I have stopped the messages being > sent to RabbitMQ from the producer process. > > I have tried setting "spark.streaming.unpersist","true" . > And My Set up has 3 node each having one core allocated for spark and > executor memory per node is 512MB. > > Please help me in solving this issue. >