Hi All, I have akka remote actors running on 2 nodes. I submitted spark application from node1. In the spark code, in one of the rdd, i am sending message to actor running on node1. My Spark code is as follows:
class ActorClient extends Actor with Serializable { import context._ val currentActor: ActorSelection = context.system.actorSelection("akka.tcp:// ActorSystem@192.168.145.183:2551/user/MasterActor") implicit val timeout = Timeout(10 seconds) def receive = { case msg:String => { if(msg.contains("Spark")) { currentActor ! msg sender ! "Local" } else { println("Received.."+msg) val future=currentActor ? msg val result = Await.result(future, timeout.duration).asInstanceOf[String] if(result.contains("ACK")) sender ! "OK" } } case PoisonPill => context.stop(self) } } object SparkExec extends Serializable { implicit val timeout = Timeout(10 seconds) val actorSystem=ActorSystem("ClientActorSystem") val actor=actorSystem.actorOf(Props(classOf[ActorClient]),name="ClientActor") def main(args:Array[String]) = { val conf = new SparkConf().setAppName("DeepLearningSpark") val sc=new SparkContext(conf) val textrdd=sc.textFile("hdfs://IMPETUS-DSRV02:9000/deeplearning/sample24k.csv") val rdd1=textrddmap{ line => println("In Map...") val future = actor ? "Hello..Spark" val result = Await.result(future,timeout.duration).asInstanceOf[String] if(result.contains("Local")){ println("Recieved in map...."+result) //actorSystem.shutdown } (10) } val rdd2=rdd1.map{ x => val future=actor ? "Done" val result = Await.result(future, timeout.duration).asInstanceOf[String] if(result.contains("OK")) { actorSystem.stop(remoteActor) actorSystem.shutdown } (2) } rdd2.saveAsTextFile("/home/padma/SparkAkkaOut") } } In my ActorClientActor, through actorSelection, identifying the remote actor and sending the message. Once the messages are sent, in *rdd2*, after receiving ack from remote actor, i am killing the actor ActorClient and shutting down the ActorSystem. The above code is throwing the following exception: 14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.ExceptionInInitializerError: com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166) com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) 14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in): java.lang.NoClassDefFoundError: Could not initialize class com.impetus.spark.SparkExec$ com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166) com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Please anyone could help me on this ? My concern is i want to send message to an actor within a spark rdd and after sending the messages the actorsystem need to be shutdown. Thanks, Padma Ch