Hi All,
I am new to both Scala & Spark, so please expect some mistakes. Setup : Scala : 2.10.2 Spark : Apache 1.1.0 Hadoop : Apache 2.4 Intend of the code : To read from kafka topic & do some processing. Below are the code details and error am getting. : import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.SparkContext._ import scala.collection.IndexedSeq._ import org.apache.spark.streaming.dstream import java.io.File import java.util.Properties import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by samyamaiti on 12/25/14. */ object Driver { def main(args: Array[String]) { //CheckPoint dir in HDFS val checkpointDirectory = "hdfs://localhost:8020/user/samyamaiti/SparkCheckpoint1" //functionToCreateContext def functionToCreateContext(): StreamingContext = { //Setting conf object val conf = new SparkConf() conf.setMaster("spark://SamyaMac.local:7077") conf.setAppName("SparkStreamingFileProcessor") val ssc = new StreamingContext(conf, Seconds(1)) //Create Check pointing ssc.checkpoint(checkpointDirectory) ssc } // Get StreamingContext from checkpoint data or create a new one val sscContext = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) //Accumulator to keep track of number of messages val numInputMessages = sscContext.sparkContext.accumulator(0L, "Kafka messages consumed") //Number of consumer threads Input DStream val consumerThreadsPerInputDStream = 1 //Setting the topic val topics = Map("testTopic" -> consumerThreadsPerInputDStream) //Zookeeper Qurom address val zkQurom = "http://localhost:2181" //Setting up the DStream val kafkaDStreams = { val numPartitionsOfInputTopic = 1 val streams = (1 to numPartitionsOfInputTopic) map { _ => KafkaUtils.createStream(sscContext, zkQurom, kafkaParams, topics).map(_._2) } val unifiedStream = sscContext.union(streams) val sparkProcessingParallelism = 1 unifiedStream.repartition(sparkProcessingParallelism) } //Setting the stream processing pipeline //Printing the file name in HDFS as received from Kafka & saving the same to HDFS kafkaDStreams.map { case bytes => numInputMessages += 1 }.foreachRDD(rdd => { println("2") }) // Run the streaming job sscContext.start() sscContext.awaitTermination() } } Build.sbt --------- name := "SparkFileProcessor" version := "1.0" scalaVersion := "2.10.2" libraryDependencies ++= Seq( "org.apache.spark" % "spark-streaming_2.10" % "1.1.0", "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0", "org.apache.hadoop" % "hadoop-client" % "2.4.0" ) Error ----- 14/12/25 23:55:06 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 14/12/25 23:55:06 INFO NettyBlockTransferService: Server created on 56078 14/12/25 23:55:06 INFO BlockManagerMaster: Trying to register BlockManager 14/12/25 23:55:06 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@***.***.***.***:56065] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 14/12/25 23:55:36 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReliableDeliverySupervisor-Association-with-remote-system-tp20859.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org