I am pasting the code here . Please let me know if there is any sequence
that is wrong.


    def createContext(checkpointDirectory: String, config: Config):
StreamingContext = {
        println("Creating new context")

        val conf = new
SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")

        val ssc = new StreamingContext(conf,
Seconds(config.getInt(batchIntervalParam)))
        ssc.checkpoint(checkpointDirectory)
        val isValid = validate(ssc, config)

        if (isValid) {
          val result = runJob(ssc, config)
          println("result is " + result)
        } else {
          println(isValid.toString)
        }

        ssc
     }

      def main(args: Array[String]): Unit = {

        if (args.length < 1) {
          println("Must specify the path to config file ")
          println("Usage progname <path to config file> ")
          return
        }
        val url = args(0)
        logger.info("Starting " + appName)
        println("Got the path as %s".format(url))
        val source = scala.io.Source.fromFile(url)
        val lines = try source.mkString finally source.close()
        val config = ConfigFactory.parseString(lines)
        val directoryPath = config.getString(checkPointParam)

        val ssc = StreamingContext.getOrCreate(directoryPath, () => {
          createContext(directoryPath,config)
        })

        ssc.start()
        ssc.awaitTermination()
      }


      def getRabbitMQStream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
        val rabbitMQHost = config.getString(rabbitmqHostParam)
        val rabbitMQPort = config.getInt(rabbitmqPortParam)
        val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
        println("changing the memory lvel")
        val receiverStream: ReceiverInputDStream[String] = {
          RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost,
rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
        }
        receiverStream.start()
        receiverStream
      }

      def getBaseDstream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
        val baseDstream = config.getString(receiverTypeParam) match {
          case "rabbitmq" => getRabbitMQStream(config, ssc)
        }
        baseDstream
      }

      def runJob(ssc: StreamingContext, config: Config): Any = {

        val keyspace = config.getString(keyspaceParam)
        val clientStatsTable = config.getString(clientStatsTableParam)
        val hourlyStatsTable = config.getString(hourlyStatsTableParam)
        val batchInterval = config.getInt(batchIntervalParam)
        val windowInterval = config.getInt(windowIntervalParam)
        val hourlyInterval = config.getInt(hourlyParam)
        val limit = config.getInt(limitParam)

        val lines = getBaseDstream(config, ssc)
        val statsRDD =
lines.filter(_.contains("client_stats")).map(_.split(",")(1))

        val parserFunc = getProtobufParserFunction()
        val clientUsageRDD: DStream[((String, String), Double)] =
statsRDD.flatMap(x => parserFunc(x))
        val formatterFunc = getJsonFormatterFunc()
        val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x:
Double, y: Double) => x + y, Seconds(windowInterval),
Seconds(batchInterval))
          .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
          .reduceByKey((x, y) => (x ++ y))
          .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

        println("Client Usage from rabbitmq ")
        oneMinuteWindowResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
        oneMinuteWindowResult.print()

        val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double,
y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
          .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
          .reduceByKey((x, y) => (x ++ y))
          .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

        HourlyResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
        HourlyResult.map(x => (x, "hourly")).print()

      }
    }


On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran <vimal3...@gmail.com>
wrote:

> I have a simple spark streaming application which reads the data from the
> rabbitMQ
>  and does some aggregation on window interval of  1 min and 1 hour for
> batch interval of 30s.
>
>  I have a three node setup. And to enable checkpoint,
>  I have mounted the same directory using sshfs to all worker node for
> creating checkpoint.
>
>  When I run the spark streaming App for the first time it works fine .
>  I could see the results being printed on console and some checkpoints
> happening in the network directory.
>
>  But when I run the job for the second time , it fails with the following
> exception
>
>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>                 at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>                 at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>                 at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>                 at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>                 at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>                 at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>                 at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>                 at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>                 at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>                 at org.apache.spark.scheduler.Task.run(Task.scala:70)
>                 at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>                 at java.lang.Thread.run(Thread.java:745)
>
>
> And the exception is repeated.
>
>
> I am not pumping huge data to the rabbitMQ. When I run the job for the
> first time I am dumping only < 100 events .
> And when I run for the second time,  I have stopped the messages being
> sent to RabbitMQ from the producer process.
>
> I have tried setting "spark.streaming.unpersist","true" .
> And My Set up has 3 node each having one core allocated for spark and
> executor memory per node is 512MB.
>
> Please help me in solving this issue.
>

Reply via email to