Graceful shutdown in spark streaming

2014-12-22 Thread Jesper Lundgren
Hello all,

I have a spark streaming application running in a standalone cluster
(deployed with spark-submit --deploy-mode cluster). I am trying to add
graceful shutdown functionality to this application but I am not sure what
is the best practice for this.

Currently I am using this code:

sys.addShutdownHook {
  log.info("Shutdown requested. Graceful shutdown started.")
  ssc.stop(stopSparkContext = true, stopGracefully= true)
  log.info("Shutdown Complete. Bye")
}
ssc.start()
ssc.awaitTermination()

It seems to be working but I still get some errors in the driver log and
the master UI shows failed as status for the driver after it is stopped

Driver log:

14/12/22 16:42:30 INFO Main: Shutdown requested. Graceful shutdown started.
.
.
.
14/12/22 16:42:40 INFO JobGenerator: Stopping JobGenerator gracefully.
.
.
.
14/12/22 16:42:50 INFO DAGScheduler: Job 1 failed: start at Main.scala:114,
took 93.444222 s Exception in thread "Thread-34"
org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down at
org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369) at
akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at
akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at
akka.dispatch.Mailbox.run(Mailbox.scala:219) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/12/22 16:42:51 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
stopped! 14/12/22 16:42:51 INFO MemoryStore: MemoryStore cleared 14/12/22
16:42:51 INFO BlockManager: BlockManager stopped 14/12/22 16:42:51 INFO
BlockManagerMaster: BlockManagerMaster stopped 14/12/22 16:42:51 INFO
SparkContext: Successfully stopped SparkContext 14/12/22 16:42:51 INFO
Main: Shutdown Complete. Bye
(end of driver log)

Anyone has experience to share regarding graceful shutdown in production
for spark streaming?

Thanks!

Best Regards,
Jesper Lundgren


How to kill/upgrade/restart driver launched in Spark standalone cluster+supervised mode?

2014-11-16 Thread Jesper Lundgren
Hello,

I have a Spark Standalone cluster running in HA mode. I launched a
application using spark-submit with cluster and supervised mode enabled and
it launched sucessfully on one of the worker nodes.

How can I stop/restart/kill or otherwise manage such task running in a
standalone cluster? Seems there is no options in the web interface. I
wonder how I can upgrade my driver in the future.

Also, does supervised mode work across worker nodes? IE will it relaunch on
another node if the current one dies or does it only handle restart on same
node after driver crash?

I would love to hear others experience with this :)

Thanks!

(PS i am launching a Spark Streaming application)

// Jesper Lundgren


Spark Streaming: foreachRDD network output

2014-09-25 Thread Jesper Lundgren
Hello all,

I have some questions regarding the foreachRDD output function in Spark
Streaming.

The programming guide (
http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html)
describes how to output data using network connection on the worker nodes.

Are there some working examples on how to do this properly? (Most of the
guide just describes what to not do, instead of what to do).

Any suggestions on what is the best way to write tests for such code? To
make sure that connection objects are used properly etc.

How to handle network or other problems on worker node? Can I throw an
exception to force spark to try again with that data on another node? As an
example: a program that writes data to an sql database using foreachRDD.
One worker node might have connection issues to the database, so it has to
let another node finish the output operation.

Thanks!

-- Jesper Lundgren