Checkpoint not working after driver restart

2015-11-03 Thread vimal dinakaran
I have a simple spark streaming application which reads the data from the
rabbitMQ
 and does some aggregation on window interval of  1 min and 1 hour for
batch interval of 30s.

 I have a three node setup. And to enable checkpoint,
 I have mounted the same directory using sshfs to all worker node for
creating checkpoint.

 When I run the spark streaming App for the first time it works fine .
 I could see the results being printed on console and some checkpoints
happening in the network directory.

 But when I run the job for the second time , it fails with the following
exception

at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


And the exception is repeated.


I am not pumping huge data to the rabbitMQ. When I run the job for the
first time I am dumping only < 100 events .
And when I run for the second time,  I have stopped the messages being sent
to RabbitMQ from the producer process.

I have tried setting "spark.streaming.unpersist","true" .
And My Set up has 3 node each having one core allocated for spark and
executor memory per node is 512MB.

Please help me in solving this issue.


Re: Checkpoint not working after driver restart

2015-11-07 Thread vimal dinakaran
I am pasting the code here . Please let me know if there is any sequence
that is wrong.


def createContext(checkpointDirectory: String, config: Config):
StreamingContext = {
println("Creating new context")

val conf = new
SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")

val ssc = new StreamingContext(conf,
Seconds(config.getInt(batchIntervalParam)))
ssc.checkpoint(checkpointDirectory)
val isValid = validate(ssc, config)

if (isValid) {
  val result = runJob(ssc, config)
  println("result is " + result)
} else {
  println(isValid.toString)
}

ssc
 }

  def main(args: Array[String]): Unit = {

if (args.length < 1) {
  println("Must specify the path to config file ")
  println("Usage progname  ")
  return
}
val url = args(0)
logger.info("Starting " + appName)
println("Got the path as %s".format(url))
val source = scala.io.Source.fromFile(url)
val lines = try source.mkString finally source.close()
val config = ConfigFactory.parseString(lines)
val directoryPath = config.getString(checkPointParam)

val ssc = StreamingContext.getOrCreate(directoryPath, () => {
  createContext(directoryPath,config)
})

ssc.start()
ssc.awaitTermination()
  }


  def getRabbitMQStream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val rabbitMQHost = config.getString(rabbitmqHostParam)
val rabbitMQPort = config.getInt(rabbitmqPortParam)
val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
println("changing the memory lvel")
val receiverStream: ReceiverInputDStream[String] = {
  RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost,
rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
}
receiverStream.start()
receiverStream
  }

  def getBaseDstream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val baseDstream = config.getString(receiverTypeParam) match {
  case "rabbitmq" => getRabbitMQStream(config, ssc)
}
baseDstream
  }

  def runJob(ssc: StreamingContext, config: Config): Any = {

val keyspace = config.getString(keyspaceParam)
val clientStatsTable = config.getString(clientStatsTableParam)
val hourlyStatsTable = config.getString(hourlyStatsTableParam)
val batchInterval = config.getInt(batchIntervalParam)
val windowInterval = config.getInt(windowIntervalParam)
val hourlyInterval = config.getInt(hourlyParam)
val limit = config.getInt(limitParam)

val lines = getBaseDstream(config, ssc)
val statsRDD =
lines.filter(_.contains("client_stats")).map(_.split(",")(1))

val parserFunc = getProtobufParserFunction()
val clientUsageRDD: DStream[((String, String), Double)] =
statsRDD.flatMap(x => parserFunc(x))
val formatterFunc = getJsonFormatterFunc()
val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x:
Double, y: Double) => x + y, Seconds(windowInterval),
Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

println("Client Usage from rabbitmq ")
oneMinuteWindowResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
oneMinuteWindowResult.print()

val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double,
y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

HourlyResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()

  }
}


On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran 
wrote:

> I have a simple spark streaming application which reads the data from the
> rabbitMQ
>  and does some aggregation on window interval of  1 min and 1 hour for
> batch interval of 30s.
>
>  I have a three node setup. And to enable checkpoint,
>  I have mounted the same directory using sshfs to all worker node for
> creating checkpoint.
>
>  When I run the spark streaming App for the first time it works fine .
>  I could see the results being printed on console and some checkpoints
> happening in the network directory.
>
>  But when I run the job for the second time , it fails with the following
> exception
>
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.