I am pasting the code here . Please let me know if there is any sequence
that is wrong.
def createContext(checkpointDirectory: String, config: Config):
StreamingContext = {
println("Creating new context")
val conf = new
SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")
val ssc = new StreamingContext(conf,
Seconds(config.getInt(batchIntervalParam)))
ssc.checkpoint(checkpointDirectory)
val isValid = validate(ssc, config)
if (isValid) {
val result = runJob(ssc, config)
println("result is " + result)
} else {
println(isValid.toString)
}
ssc
}
def main(args: Array[String]): Unit = {
if (args.length < 1) {
println("Must specify the path to config file ")
println("Usage progname ")
return
}
val url = args(0)
logger.info("Starting " + appName)
println("Got the path as %s".format(url))
val source = scala.io.Source.fromFile(url)
val lines = try source.mkString finally source.close()
val config = ConfigFactory.parseString(lines)
val directoryPath = config.getString(checkPointParam)
val ssc = StreamingContext.getOrCreate(directoryPath, () => {
createContext(directoryPath,config)
})
ssc.start()
ssc.awaitTermination()
}
def getRabbitMQStream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val rabbitMQHost = config.getString(rabbitmqHostParam)
val rabbitMQPort = config.getInt(rabbitmqPortParam)
val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
println("changing the memory lvel")
val receiverStream: ReceiverInputDStream[String] = {
RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost,
rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
}
receiverStream.start()
receiverStream
}
def getBaseDstream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val baseDstream = config.getString(receiverTypeParam) match {
case "rabbitmq" => getRabbitMQStream(config, ssc)
}
baseDstream
}
def runJob(ssc: StreamingContext, config: Config): Any = {
val keyspace = config.getString(keyspaceParam)
val clientStatsTable = config.getString(clientStatsTableParam)
val hourlyStatsTable = config.getString(hourlyStatsTableParam)
val batchInterval = config.getInt(batchIntervalParam)
val windowInterval = config.getInt(windowIntervalParam)
val hourlyInterval = config.getInt(hourlyParam)
val limit = config.getInt(limitParam)
val lines = getBaseDstream(config, ssc)
val statsRDD =
lines.filter(_.contains("client_stats")).map(_.split(",")(1))
val parserFunc = getProtobufParserFunction()
val clientUsageRDD: DStream[((String, String), Double)] =
statsRDD.flatMap(x => parserFunc(x))
val formatterFunc = getJsonFormatterFunc()
val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x:
Double, y: Double) => x + y, Seconds(windowInterval),
Seconds(batchInterval))
.map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
.reduceByKey((x, y) => (x ++ y))
.mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))
println("Client Usage from rabbitmq ")
oneMinuteWindowResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
oneMinuteWindowResult.print()
val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double,
y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
.map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
.reduceByKey((x, y) => (x ++ y))
.mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))
HourlyResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()
}
}
On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran
wrote:
> I have a simple spark streaming application which reads the data from the
> rabbitMQ
> and does some aggregation on window interval of 1 min and 1 hour for
> batch interval of 30s.
>
> I have a three node setup. And to enable checkpoint,
> I have mounted the same directory using sshfs to all worker node for
> creating checkpoint.
>
> When I run the spark streaming App for the first time it works fine .
> I could see the results being printed on console and some checkpoints
> happening in the network directory.
>
> But when I run the job for the second time , it fails with the following
> exception
>
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.