Hi Team, Could you please respond on my below request.
Regards, Rajesh On Thu, Sep 25, 2014 at 11:38 PM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi Team, > > Can I use Actors in Spark Streaming based on events type? Could you please > review below Test program and let me know if any thing I need to change > with respect to best practices > > import akka.actor.Actor > import akka.actor.{ActorRef, Props} > import org.apache.spark.SparkConf > import org.apache.spark.streaming.StreamingContext > import org.apache.spark.streaming.Seconds > import akka.actor.ActorSystem > > case class one(r: org.apache.spark.rdd.RDD[String]) > case class two(s: org.apache.spark.rdd.RDD[String]) > > class Events extends Actor > { > def receive = { > // Based on event type - Invoke respective methods asynchronously > case one(r) => println("ONE COUNT" + r.count) // Invoke respective > functions > case two(s) => println("TWO COUNT" + s.count) // Invoke respective > functions > } > } > > object Test { > > def main(args: Array[String]) { > val system = ActorSystem("System") > val event: ActorRef = system.actorOf(Props[Events], "events") > val sparkConf = new SparkConf() setAppName("AlertsLinesCount") > setMaster("local") > val ssc = new StreamingContext(sparkConf, Seconds(30)) > val lines = ssc > textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/") > lines foreachRDD(x => { > event ! one(x) > event ! two(x) > }) > ssc.start > ssc.awaitTermination > } > } > > Regards, > Rajesh >