Hi Cody, Just added zookeeper.connect to kafkaparams . It couldn't come out of batch window. Other batches are queued. I could see foreach(println) of dataFrame printing one of partition's data and not the other. Couldn't see any errors from log.
val brokers = "localhost:9092,localhost:9093" val sparkConf = new SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077 val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(1)) val kafkaParams = Map[String, String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181"," group.id"->"xyz") val topics = "test" val topicsSet = topics.split(",").toSet val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) import sqlContext.implicits._ messages.foreachRDD(rdd => { if (rdd.isEmpty()) { println("Failed to get data from Kafka. Please check that the Kafka producer is streaming data.") System.exit(-1) } val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() dataframe.foreach(println) println( "$$$$$$$$$$$", dataframe.count()) }) Logs: 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on interface eth1) 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on port 45031. 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started 16/08/10 18:16:26 INFO Remoting: Starting remoting 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638] 16/08/10 18:16:26 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 56638. 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5 MB 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at http://192.168.126.131:4040 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file server' on port 59491. 16/08/10 18:16:28 INFO SparkContext: Added JAR file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp 1470833188094 16/08/10 18:16:29 INFO SparkContext: Added JAR file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470833189531 16/08/10 18:16:29 INFO SparkContext: Added JAR file:/home/cloudera/Downloads/boa/pain.jar at http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host localhost 16/08/10 18:16:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361. 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost, 55361) 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is overridden to xyz 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval = null 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = 1000 ms 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms 16/08/10 18:16:31 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@d8e872 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at time 1470833192000 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at 1470833192000 ms 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is overridden to xyz 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 ms 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job 1470833192000 ms.0 from job set of time 1470833192000 ms 16/08/10 18:16:32 INFO SparkContext: Starting job: json at todelete.scala:42 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at todelete.scala:42) with 2 output partitions 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at todelete.scala:42) 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List() 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List() 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing parents 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.7 KB, free 4.7 KB) 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.3 KB) 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:55361 (size: 2.6 KB, free: 511.5 MB) 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42) 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) 16/08/10 18:16:32 INFO Executor: Fetching http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470833189531 16/08/10 18:16:32 INFO Utils: Fetching http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 ms 16/08/10 18:16:33 INFO Executor: Adding file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-assembly-1.6.2-hadoop2.6.0.jar to class loader 16/08/10 18:16:33 INFO Executor: Fetching http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 16/08/10 18:16:33 INFO Utils: Fetching http://192.168.126.131:59491/jars/pain.jar to /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp 16/08/10 18:16:33 INFO Executor: Adding file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar to class loader 16/08/10 18:16:33 INFO Executor: Fetching http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp 1470833188094 16/08/10 18:16:33 INFO Utils: Fetching http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar to /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp 16/08/10 18:16:33 INFO Executor: Adding file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class loader 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 offsets 0 -> 20 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is overridden to xyz 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 13366 bytes result sent to driver 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 0) in 2423 ms on localhost (1/2) 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 ms 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 ms 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms 16/08/10 18:16:54 INFO JobSch On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> wrote: > Those logs you're posting are from right after your failure, they don't > include what actually went wrong when attempting to read json. Look at your > logs more carefully. > On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" < > diwakar.dhanusk...@gmail.com> wrote: > >> Hi Siva, >> >> With below code, it is stuck up at >> * sqlContext.read.json(rdd.map(_._2)).toDF()* >> There are two partitions in topic. >> I am running spark 1.6.2 >> >> val topics = "topic.name" >> val brokers = "localhost:9092" >> val topicsSet = topics.split(",").toSet >> val sparkConf = new SparkConf().setAppName("KafkaW >> eather").setMaster("local[5]")//spark://localhost:7077 >> val sc = new SparkContext(sparkConf) >> val ssc = new StreamingContext(sc, Seconds(60)) >> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, " >> group.id" -> "xyz","auto.offset.reset"->"smallest") >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> messages.foreachRDD(rdd => { >> if (rdd.isEmpty()) { >> println("Failed to get data from Kafka. Please check that the Kafka >> producer is streaming data.") >> System.exit(-1) >> } >> val sqlContext = org.apache.spark.sql.SQLContex >> t.getOrCreate(rdd.sparkContext) >> *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()* >> dataframe.foreach(println) >> >> }) >> >> >> Below are logs, >> >> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at >> todelete.scala:34) failed in 110.776 s >> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> stopped! Dropping event SparkListenerStageCompleted(or >> g.apache.spark.scheduler.StageInfo@6d8ff688) >> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> stopped! Dropping event SparkListenerJobEnd(0,14708122 >> 71971,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because >> SparkContext was shut down)) >> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint: >> MapOutputTrackerMasterEndpoint stopped! >> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared >> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped >> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped >> 16/08/10 12:27:51 INFO >> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: >> OutputCommitCoordinator stopped! >> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator: >> Shutting down remote daemon. >> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote >> daemon shut down; proceeding with flushing remote transports. >> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext >> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called >> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory >> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1 >> b6-01db-45b5-9302-d2f67f7c490e >> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: >> Remoting shut down. >> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory >> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2 >> [cloudera@quickstart ~]$ spark-submit --master local[3] --class >> com.boa.poc.todelete --jars /home/cloudera/lib/spark-strea >> ming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/spark >> -assembly-1.6.2-hadoop2.6.0.jar /home/cloudera/Downloads/boa/pain.jar > >> log.txt >> Using Spark's default log4j profile: org/apache/spark/log4j-default >> s.properties >> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2 >> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves >> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on >> interface eth1) >> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to >> another address >> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera >> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera >> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication >> disabled; ui acls disabled; users with view permissions: Set(cloudera); >> users with modify permissions: Set(cloudera) >> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver' >> on port 42140. >> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started >> 16/08/10 12:28:01 INFO Remoting: Starting remoting >> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses >> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328] >> 16/08/10 12:28:01 INFO Utils: Successfully started service >> 'sparkDriverActorSystem' on port 53328. >> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker >> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster >> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at >> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2 >> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity >> 511.5 MB >> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator >> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on >> port 4040. >> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at >> http://192.168.126.131:4040 >> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is >> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563a >> d1-3d30-4a9c-ab11-82ecbb2e71b0 >> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server >> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file >> server' on port 58957. >> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at >> http://192.168.126.131:58957/jars/spark-streaming-kafka-asse >> mbly_2.10-1.6.2.jar with timestamp 1470812282187 >> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at >> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar >> with timestamp 1470812282398 >> 16/08/10 12:28:02 INFO SparkContext: Added JAR >> file:/home/cloudera/Downloads/boa/pain.jar at >> http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402 >> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host >> localhost >> 16/08/10 12:28:02 INFO Utils: Successfully started service >> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56716. >> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on 56716 >> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register BlockManager >> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block >> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver, >> localhost, 56716) >> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager >> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties >> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset >> is overridden to smallest >> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is >> overridden to xyz >> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect >> is overridden to >> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1 >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level = >> StorageLevel(false, false, false, false, 1) >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval = null >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = 60000 >> ms >> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and validated >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5 >> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms >> 16/08/10 12:28:03 INFO ForEachDStream: Storage level = >> StorageLevel(false, false, false, false, 1) >> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null >> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms >> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated >> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143 >> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator at >> time 1470812340000 >> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at >> 1470812340000 ms >> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler >> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started >> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties >> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset >> is overridden to smallest >> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is >> overridden to xyz >> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect >> is overridden to >> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000 ms >> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job >> 1470812340000 ms.0 from job set of time 1470812340000 ms >> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at >> todelete.scala:34 >> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at >> todelete.scala:34) with 2 output partitions >> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json at >> todelete.scala:34) >> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List() >> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List() >> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0 >> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no missing >> parents >> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values in >> memory (estimated size 4.7 KB, free 4.7 KB) >> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as >> bytes in memory (estimated size 2.6 KB, free 7.2 KB) >> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in >> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB) >> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast >> at DAGScheduler.scala:1006 >> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from >> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34) >> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks >> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 >> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) >> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) >> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j >> ars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp >> 1470812282187 >> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j >> ars/spark-streaming-kafka-assembly_2.10-1.6.2.jar to >> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56 >> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp >> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb- >> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89- >> 9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class >> loader >> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j >> ars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470812282398 >> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j >> ars/spark-assembly-1.6.2-hadoop2.6.0.jar to >> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56 >> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp >> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb- >> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89- >> 9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar to class loader >> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j >> ars/pain.jar with timestamp 1470812282402 >> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j >> ars/pain.jar to /tmp/spark-861074da-9bfb-475c- >> a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d6 >> 2a37/fetchFileTemp100401525133805542.tmp >> 16/08/10 12:29:02 INFO Executor: Adding file:/tmp/spark-861074da-9bfb- >> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar >> to class loader >> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0 >> offsets 0 -> 8 >> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties >> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset >> is overridden to smallest >> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is >> overridden to xyz >> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect >> is overridden to >> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). >> 13366 bytes result sent to driver >> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 >> (TID 0) in 2380 ms on localhost (1/2) >> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000 ms >> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000 ms >> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000 ms >> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000 ms >> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000 ms >> >> >> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi < >> diwakar.dhanusk...@gmail.com> wrote: >> >>> Hi Siva, >>> >>> Does topic has partitions? which version of Spark you are using? >>> >>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Here is a working example I did. >>>> >>>> HTH >>>> >>>> Regards, >>>> >>>> Sivakumaran S >>>> >>>> val topics = "test" >>>> val brokers = "localhost:9092" >>>> val topicsSet = topics.split(",").toSet >>>> val sparkConf = new SparkConf().setAppName("KafkaW >>>> eatherCalc").setMaster("local") //spark://localhost:7077 >>>> val sc = new SparkContext(sparkConf) >>>> val ssc = new StreamingContext(sc, Seconds(60)) >>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >>>> val messages = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >>>> messages.foreachRDD(rdd => { >>>> if (rdd.isEmpty()) { >>>> println("Failed to get data from Kafka. Please check that the Kafka >>>> producer is streaming data.") >>>> System.exit(-1) >>>> } >>>> val sqlContext = org.apache.spark.sql.SQLContex >>>> t.getOrCreate(rdd.sparkContext) >>>> val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() >>>> //Process your DF as required here on >>>> } >>>> >>>> >>>> >>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi < >>>> diwakar.dhanusk...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> I am reading json messages from kafka . Topics has 2 partitions. When >>>> running streaming job using spark-submit, I could see that * val >>>> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes >>>> indefinitely. Am I doing something wrong here. Below is code .This >>>> environment is cloudera sandbox env. Same issue in hadoop production >>>> cluster mode except that it is restricted thats why tried to reproduce >>>> issue in Cloudera sandbox. Kafka 0.10 and Spark 1.4. >>>> >>>> val kafkaParams = Map[String,String]("bootstrap. >>>> servers"->"localhost:9093,localhost:9092", "group.id" -> >>>> "xyz","auto.offset.reset"->"smallest") >>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic") >>>> val ssc = new StreamingContext(conf, Seconds(1)) >>>> >>>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >>>> >>>> val topics = Set("gpp.minf") >>>> val kafkaStream = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics) >>>> >>>> kafkaStream.foreachRDD( >>>> rdd => { >>>> if (rdd.count > 0){ >>>> * val dataFrame = sqlContext.read.json(rdd.map(_._2)) * >>>> dataFrame.printSchema() >>>> //dataFrame.foreach(println) >>>> } >>>> } >>>> >>>> >>>> >>> >>