Figured it out. All I am doing wrong is testing it out in pseudo node vm with 1 core. The tasks were hanging out for cpu. In production cluster this works just fine.
On Thu, Aug 11, 2016 at 12:45 AM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Checked executor logs and UI . There is no error message or something like > that. when there is any action , it is waiting . > There are data in partitions. I could use simple-consumer-shell and print > all data in console. Am I doing anything wrong in foreachRDD?. > This just works fine with single partitioned topic, > > On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> zookeeper.connect is irrelevant. >> >> Did you look at your executor logs? >> Did you look at the UI for the (probably failed) stages? >> Are you actually producing data into all of the kafka partitions? >> If you use kafka-simple-consumer-shell.sh to read that partition, do >> you get any data? >> >> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi >> <diwakar.dhanusk...@gmail.com> wrote: >> > Hi Cody, >> > >> > Just added zookeeper.connect to kafkaparams . It couldn't come out of >> batch >> > window. Other batches are queued. I could see foreach(println) of >> dataFrame >> > printing one of partition's data and not the other. >> > Couldn't see any errors from log. >> > >> > val brokers = "localhost:9092,localhost:9093" >> > val sparkConf = new >> > SparkConf().setAppName("KafkaWeather").setMaster("local[5]") >> //spark://localhost:7077 >> > val sc = new SparkContext(sparkConf) >> > val ssc = new StreamingContext(sc, Seconds(1)) >> > val kafkaParams = Map[String, >> > String]("bootstrap.servers"->"localhost:9093,localhost:9092" >> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181"," >> group.id"->"xyz") >> > val topics = "test" >> > val topicsSet = topics.split(",").toSet >> > val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, >> > StringDecoder](ssc, kafkaParams, topicsSet) >> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >> > import sqlContext.implicits._ >> > messages.foreachRDD(rdd => { >> > if (rdd.isEmpty()) { >> > println("Failed to get data from Kafka. Please check that the Kafka >> > producer is streaming data.") >> > System.exit(-1) >> > } >> > >> > val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() >> > dataframe.foreach(println) >> > println( "$$$$$$$$$$$", dataframe.count()) >> > }) >> > Logs: >> > >> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2 >> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop >> > library for your platform... using builtin-java classes where applicable >> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera >> resolves to >> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on >> interface >> > eth1) >> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to >> > another address >> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera >> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: >> cloudera >> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(cloudera); >> > users with modify permissions: Set(cloudera) >> > 16/08/10 18:16:25 INFO Utils: Successfully started service >> 'sparkDriver' on >> > port 45031. >> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started >> > 16/08/10 18:16:26 INFO Remoting: Starting remoting >> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on >> addresses >> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638] >> > 16/08/10 18:16:26 INFO Utils: Successfully started service >> > 'sparkDriverActorSystem' on port 56638. >> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker >> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster >> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at >> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee >> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity >> 511.5 >> > MB >> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator >> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on >> port >> > 4040. >> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at >> > http://192.168.126.131:4040 >> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is >> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e >> 68-2952-41b0-a11b-f07860682749 >> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server >> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file >> > server' on port 59491. >> > 16/08/10 18:16:28 INFO SparkContext: Added JAR >> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar >> at >> > http://192.168.126.131:59491/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> > with timestamp 1470833188094 >> > 16/08/10 18:16:29 INFO SparkContext: Added JAR >> > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at >> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar >> with >> > timestamp 1470833189531 >> > 16/08/10 18:16:29 INFO SparkContext: Added JAR >> > file:/home/cloudera/Downloads/boa/pain.jar at >> > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 >> > 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host >> > localhost >> > 16/08/10 18:16:29 INFO Utils: Successfully started service >> > 'org.apache.spark.network.netty.NettyBlockTransferService' on port >> 55361. >> > 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on >> 55361 >> > 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register >> BlockManager >> > 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block >> manager >> > localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost, >> 55361) >> > 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager >> > 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties >> > 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset >> is >> > overridden to smallest >> > 16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is >> overridden >> > to xyz >> > 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect >> is >> > overridden to localhost:2181 >> > 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1 >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = >> -1 >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level = >> > StorageLevel(false, false, false, false, 1) >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval = >> null >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = >> 1000 ms >> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and >> validated >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff >> > 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms >> > 16/08/10 18:16:31 INFO ForEachDStream: Storage level = >> StorageLevel(false, >> > false, false, false, 1) >> > 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null >> > 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms >> > 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated >> > org.apache.spark.streaming.dstream.ForEachDStream@d8e872 >> > 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at >> > time 1470833192000 >> > 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at >> 1470833192000 >> > ms >> > 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler >> > 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started >> > 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties >> > 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset >> is >> > overridden to smallest >> > 16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is >> overridden >> > to xyz >> > 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect >> is >> > overridden to localhost:2181 >> > 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 >> ms >> > 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job >> > 1470833192000 ms.0 from job set of time 1470833192000 ms >> > 16/08/10 18:16:32 INFO SparkContext: Starting job: json at >> todelete.scala:42 >> > 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at >> todelete.scala:42) >> > with 2 output partitions >> > 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at >> > todelete.scala:42) >> > 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List() >> > 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List() >> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0 >> > (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing >> > parents >> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values >> in >> > memory (estimated size 4.7 KB, free 4.7 KB) >> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as >> bytes >> > in memory (estimated size 2.6 KB, free 7.3 KB) >> > 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in >> memory >> > on localhost:55361 (size: 2.6 KB, free: 511.5 MB) >> > 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast >> at >> > DAGScheduler.scala:1006 >> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from >> > ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42) >> > 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 >> tasks >> > 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 >> (TID >> > 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) >> > 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) >> > 16/08/10 18:16:32 INFO Executor: Fetching >> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar >> with >> > timestamp 1470833189531 >> > 16/08/10 18:16:32 INFO Utils: Fetching >> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar >> to >> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3 >> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp >> > 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 >> ms >> > 16/08/10 18:16:33 INFO Executor: Adding >> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil >> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-assembly-1. >> 6.2-hadoop2.6.0.jar >> > to class loader >> > 16/08/10 18:16:33 INFO Executor: Fetching >> > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 >> > 16/08/10 18:16:33 INFO Utils: Fetching >> > http://192.168.126.131:59491/jars/pain.jar to >> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3 >> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp >> > 16/08/10 18:16:33 INFO Executor: Adding >> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil >> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar >> > to class loader >> > 16/08/10 18:16:33 INFO Executor: Fetching >> > http://192.168.126.131:59491/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> > with timestamp 1470833188094 >> > 16/08/10 18:16:33 INFO Utils: Fetching >> > http://192.168.126.131:59491/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> > to >> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3 >> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp >> > 16/08/10 18:16:33 INFO Executor: Adding >> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil >> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming- >> kafka-assembly_2.10-1.6.2.jar >> > to class loader >> > 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 >> offsets 0 >> > -> 20 >> > 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties >> > 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset >> is >> > overridden to smallest >> > 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is >> overridden >> > to xyz >> > 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect >> is >> > overridden to localhost:2181 >> > 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 >> ms >> > 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). >> > 13366 bytes result sent to driver >> > 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 >> (TID >> > 0) in 2423 ms on localhost (1/2) >> > 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 >> ms >> > 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 >> ms >> > 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 >> ms >> > 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 >> ms >> > 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 >> ms >> > 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 >> ms >> > 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 >> ms >> > 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 >> ms >> > 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 >> ms >> > 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 >> ms >> > 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 >> ms >> > 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 >> ms >> > 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 >> ms >> > 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 >> ms >> > 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 >> ms >> > 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 >> ms >> > 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 >> ms >> > 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 >> ms >> > 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 >> ms >> > 16/08/10 18:16:54 INFO JobSch >> > >> > On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >> Those logs you're posting are from right after your failure, they don't >> >> include what actually went wrong when attempting to read json. Look at >> your >> >> logs more carefully. >> >> >> >> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" >> >> <diwakar.dhanusk...@gmail.com> wrote: >> >>> >> >>> Hi Siva, >> >>> >> >>> With below code, it is stuck up at >> >>> sqlContext.read.json(rdd.map(_._2)).toDF() >> >>> There are two partitions in topic. >> >>> I am running spark 1.6.2 >> >>> >> >>> val topics = "topic.name" >> >>> val brokers = "localhost:9092" >> >>> val topicsSet = topics.split(",").toSet >> >>> val sparkConf = new >> >>> SparkConf().setAppName("KafkaWeather").setMaster("local[5]") >> //spark://localhost:7077 >> >>> val sc = new SparkContext(sparkConf) >> >>> val ssc = new StreamingContext(sc, Seconds(60)) >> >>> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> brokers, >> >>> "group.id" -> "xyz","auto.offset.reset"->"smallest") >> >>> val messages = KafkaUtils.createDirectStream[String, String, >> >>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> >>> messages.foreachRDD(rdd => { >> >>> if (rdd.isEmpty()) { >> >>> println("Failed to get data from Kafka. Please check that the >> Kafka >> >>> producer is streaming data.") >> >>> System.exit(-1) >> >>> } >> >>> val sqlContext = >> >>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) >> >>> val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() >> >>> dataframe.foreach(println) >> >>> >> >>> }) >> >>> >> >>> >> >>> Below are logs, >> >>> >> >>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at >> >>> todelete.scala:34) failed in 110.776 s >> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> >>> stopped! Dropping event >> >>> SparkListenerStageCompleted(org.apache.spark.scheduler.Stage >> Info@6d8ff688) >> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> >>> stopped! Dropping event >> >>> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache. >> spark.SparkException: >> >>> Job 0 cancelled because SparkContext was shut down)) >> >>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint: >> >>> MapOutputTrackerMasterEndpoint stopped! >> >>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared >> >>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped >> >>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped >> >>> 16/08/10 12:27:51 INFO >> >>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: >> >>> OutputCommitCoordinator stopped! >> >>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator: >> >>> Shutting down remote daemon. >> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: >> Remote >> >>> daemon shut down; proceeding with flushing remote transports. >> >>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext >> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called >> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory >> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1 >> b6-01db-45b5-9302-d2f67f7c490e >> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: >> >>> Remoting shut down. >> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory >> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2 >> >>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class >> >>> com.boa.poc.todelete --jars >> >>> /home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6. >> 2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar >> >>> /home/cloudera/Downloads/boa/pain.jar > log.txt >> >>> Using Spark's default log4j profile: >> >>> org/apache/spark/log4j-defaults.properties >> >>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2 >> >>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop >> >>> library for your platform... using builtin-java classes where >> applicable >> >>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera >> resolves >> >>> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on >> >>> interface eth1) >> >>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind >> to >> >>> another address >> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: >> cloudera >> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: >> cloudera >> >>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: >> authentication >> >>> disabled; ui acls disabled; users with view permissions: >> Set(cloudera); >> >>> users with modify permissions: Set(cloudera) >> >>> 16/08/10 12:28:00 INFO Utils: Successfully started service >> 'sparkDriver' >> >>> on port 42140. >> >>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started >> >>> 16/08/10 12:28:01 INFO Remoting: Starting remoting >> >>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on >> addresses >> >>> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328] >> >>> 16/08/10 12:28:01 INFO Utils: Successfully started service >> >>> 'sparkDriverActorSystem' on port 53328. >> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker >> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster >> >>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at >> >>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2 >> >>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity >> >>> 511.5 MB >> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator >> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' >> on >> >>> port 4040. >> >>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at >> >>> http://192.168.126.131:4040 >> >>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is >> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563a >> d1-3d30-4a9c-ab11-82ecbb2e71b0 >> >>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server >> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file >> >>> server' on port 58957. >> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> >>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar >> at >> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> >>> with timestamp 1470812282187 >> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> >>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at >> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoo >> p2.6.0.jar with >> >>> timestamp 1470812282398 >> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> >>> file:/home/cloudera/Downloads/boa/pain.jar at >> >>> http://192.168.126.131:58957/jars/pain.jar with timestamp >> 1470812282402 >> >>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host >> >>> localhost >> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service >> >>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port >> 56716. >> >>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on >> 56716 >> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register >> >>> BlockManager >> >>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block >> >>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver, >> localhost, >> >>> 56716) >> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager >> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties >> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property >> auto.offset.reset >> >>> is overridden to smallest >> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is >> >>> overridden to xyz >> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property >> zookeeper.connect >> >>> is overridden to >> >>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1 >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay >> = -1 >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level = >> >>> StorageLevel(false, false, false, false, 1) >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval = >> >>> null >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = >> 60000 >> >>> ms >> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and >> validated >> >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5 >> >>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms >> >>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level = >> >>> StorageLevel(false, false, false, false, 1) >> >>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null >> >>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms >> >>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated >> >>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143 >> >>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator >> at >> >>> time 1470812340000 >> >>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at >> >>> 1470812340000 ms >> >>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler >> >>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started >> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties >> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property >> auto.offset.reset >> >>> is overridden to smallest >> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is >> >>> overridden to xyz >> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property >> zookeeper.connect >> >>> is overridden to >> >>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time >> 1470812340000 ms >> >>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job >> >>> 1470812340000 ms.0 from job set of time 1470812340000 ms >> >>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at >> >>> todelete.scala:34 >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at >> >>> todelete.scala:34) with 2 output partitions >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json >> at >> >>> todelete.scala:34) >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List() >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List() >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0 >> >>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no >> missing >> >>> parents >> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as >> values in >> >>> memory (estimated size 4.7 KB, free 4.7 KB) >> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as >> >>> bytes in memory (estimated size 2.6 KB, free 7.2 KB) >> >>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in >> >>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB) >> >>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from >> broadcast >> >>> at DAGScheduler.scala:1006 >> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from >> >>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34) >> >>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 >> >>> tasks >> >>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 >> >>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) >> >>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) >> >>> 16/08/10 12:29:01 INFO Executor: Fetching >> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> >>> with timestamp 1470812282187 >> >>> 16/08/10 12:29:01 INFO Utils: Fetching >> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar >> >>> to >> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56 >> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp >> >>> 16/08/10 12:29:01 INFO Executor: Adding >> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil >> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-streaming- >> kafka-assembly_2.10-1.6.2.jar >> >>> to class loader >> >>> 16/08/10 12:29:01 INFO Executor: Fetching >> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoo >> p2.6.0.jar with >> >>> timestamp 1470812282398 >> >>> 16/08/10 12:29:01 INFO Utils: Fetching >> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoo >> p2.6.0.jar to >> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56 >> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp >> >>> 16/08/10 12:29:01 INFO Executor: Adding >> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil >> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-assembly-1. >> 6.2-hadoop2.6.0.jar >> >>> to class loader >> >>> 16/08/10 12:29:01 INFO Executor: Fetching >> >>> http://192.168.126.131:58957/jars/pain.jar with timestamp >> 1470812282402 >> >>> 16/08/10 12:29:01 INFO Utils: Fetching >> >>> http://192.168.126.131:58957/jars/pain.jar to >> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56 >> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp >> >>> 16/08/10 12:29:02 INFO Executor: Adding >> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil >> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar >> >>> to class loader >> >>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, >> partition 0 >> >>> offsets 0 -> 8 >> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties >> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property >> auto.offset.reset >> >>> is overridden to smallest >> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is >> >>> overridden to xyz >> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property >> zookeeper.connect >> >>> is overridden to >> >>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID >> 0). >> >>> 13366 bytes result sent to driver >> >>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 >> >>> (TID 0) in 2380 ms on localhost (1/2) >> >>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time >> 1470812400000 ms >> >>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time >> 1470812460000 ms >> >>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time >> 1470812520000 ms >> >>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time >> 1470812580000 ms >> >>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time >> 1470812640000 ms >> >>> >> >>> >> >>> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi >> >>> <diwakar.dhanusk...@gmail.com> wrote: >> >>>> >> >>>> Hi Siva, >> >>>> >> >>>> Does topic has partitions? which version of Spark you are using? >> >>>> >> >>>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> >> >>>> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> Here is a working example I did. >> >>>>> >> >>>>> HTH >> >>>>> >> >>>>> Regards, >> >>>>> >> >>>>> Sivakumaran S >> >>>>> >> >>>>> val topics = "test" >> >>>>> val brokers = "localhost:9092" >> >>>>> val topicsSet = topics.split(",").toSet >> >>>>> val sparkConf = new >> >>>>> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") >> >>>>> //spark://localhost:7077 >> >>>>> val sc = new SparkContext(sparkConf) >> >>>>> val ssc = new StreamingContext(sc, Seconds(60)) >> >>>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> >>>>> brokers) >> >>>>> val messages = KafkaUtils.createDirectStream[String, String, >> >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> >>>>> messages.foreachRDD(rdd => { >> >>>>> if (rdd.isEmpty()) { >> >>>>> println("Failed to get data from Kafka. Please check that the >> Kafka >> >>>>> producer is streaming data.") >> >>>>> System.exit(-1) >> >>>>> } >> >>>>> val sqlContext = >> >>>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) >> >>>>> val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() >> >>>>> //Process your DF as required here on >> >>>>> } >> >>>>> >> >>>>> >> >>>>> >> >>>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi >> >>>>> <diwakar.dhanusk...@gmail.com> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> I am reading json messages from kafka . Topics has 2 partitions. >> When >> >>>>> running streaming job using spark-submit, I could see that val >> dataFrame = >> >>>>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I >> doing >> >>>>> something wrong here. Below is code .This environment is cloudera >> sandbox >> >>>>> env. Same issue in hadoop production cluster mode except that it is >> >>>>> restricted thats why tried to reproduce issue in Cloudera sandbox. >> Kafka >> >>>>> 0.10 and Spark 1.4. >> >>>>> >> >>>>> val kafkaParams = >> >>>>> Map[String,String]("bootstrap.servers"->"localhost:9093,loca >> lhost:9092", >> >>>>> "group.id" -> "xyz","auto.offset.reset"->"smallest") >> >>>>> val conf = new SparkConf().setMaster("local[3 >> ]").setAppName("topic") >> >>>>> val ssc = new StreamingContext(conf, Seconds(1)) >> >>>>> >> >>>>> val sqlContext = new org.apache.spark.sql.SQLContex >> t(ssc.sparkContext) >> >>>>> >> >>>>> val topics = Set("gpp.minf") >> >>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String, >> >>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics) >> >>>>> >> >>>>> kafkaStream.foreachRDD( >> >>>>> rdd => { >> >>>>> if (rdd.count > 0){ >> >>>>> val dataFrame = sqlContext.read.json(rdd.map(_._2)) >> >>>>> dataFrame.printSchema() >> >>>>> //dataFrame.foreach(println) >> >>>>> } >> >>>>> } >> >>>>> >> >>>>> >> >>>> >> >>> >> > >> > >