Checked executor logs and UI . There is no error message or something like that. when there is any action , it is waiting . There are data in partitions. I could use simple-consumer-shell and print all data in console. Am I doing anything wrong in foreachRDD?. This just works fine with single partitioned topic,
On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> wrote: > zookeeper.connect is irrelevant. > > Did you look at your executor logs? > Did you look at the UI for the (probably failed) stages? > Are you actually producing data into all of the kafka partitions? > If you use kafka-simple-consumer-shell.sh to read that partition, do > you get any data? > > On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi Cody, > > > > Just added zookeeper.connect to kafkaparams . It couldn't come out of > batch > > window. Other batches are queued. I could see foreach(println) of > dataFrame > > printing one of partition's data and not the other. > > Couldn't see any errors from log. > > > > val brokers = "localhost:9092,localhost:9093" > > val sparkConf = new > > SparkConf().setAppName("KafkaWeather").setMaster(" > local[5]")//spark://localhost:7077 > > val sc = new SparkContext(sparkConf) > > val ssc = new StreamingContext(sc, Seconds(1)) > > val kafkaParams = Map[String, > > String]("bootstrap.servers"->"localhost:9093,localhost:9092" > ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181"," > group.id"->"xyz") > > val topics = "test" > > val topicsSet = topics.split(",").toSet > > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, > > StringDecoder](ssc, kafkaParams, topicsSet) > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > import sqlContext.implicits._ > > messages.foreachRDD(rdd => { > > if (rdd.isEmpty()) { > > println("Failed to get data from Kafka. Please check that the Kafka > > producer is streaming data.") > > System.exit(-1) > > } > > > > val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() > > dataframe.foreach(println) > > println( "$$$$$$$$$$$", dataframe.count()) > > }) > > Logs: > > > > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2 > > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop > > library for your platform... using builtin-java classes where applicable > > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera > resolves to > > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on > interface > > eth1) > > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > > another address > > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera > > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera > > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication > > disabled; ui acls disabled; users with view permissions: Set(cloudera); > > users with modify permissions: Set(cloudera) > > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' > on > > port 45031. > > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started > > 16/08/10 18:16:26 INFO Remoting: Starting remoting > > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses > > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638] > > 16/08/10 18:16:26 INFO Utils: Successfully started service > > 'sparkDriverActorSystem' on port 56638. > > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker > > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster > > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at > > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee > > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity > 511.5 > > MB > > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator > > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on > port > > 4040. > > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at > > http://192.168.126.131:4040 > > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is > > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd- > 2b2a4e68-2952-41b0-a11b-f07860682749 > > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server > > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file > > server' on port 59491. > > 16/08/10 18:16:28 INFO SparkContext: Added JAR > > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at > > http://192.168.126.131:59491/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > > with timestamp 1470833188094 > > 16/08/10 18:16:29 INFO SparkContext: Added JAR > > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at > > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > with > > timestamp 1470833189531 > > 16/08/10 18:16:29 INFO SparkContext: Added JAR > > file:/home/cloudera/Downloads/boa/pain.jar at > > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 > > 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host > > localhost > > 16/08/10 18:16:29 INFO Utils: Successfully started service > > 'org.apache.spark.network.netty.NettyBlockTransferService' on port > 55361. > > 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361 > > 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register > BlockManager > > 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block > manager > > localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost, > 55361) > > 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager > > 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties > > 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset > is > > overridden to smallest > > 16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is > overridden > > to xyz > > 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect > is > > overridden to localhost:2181 > > 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1 > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level = > > StorageLevel(false, false, false, false, 1) > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval = > null > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = 1000 > ms > > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and validated > > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff > > 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms > > 16/08/10 18:16:31 INFO ForEachDStream: Storage level = > StorageLevel(false, > > false, false, false, 1) > > 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null > > 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms > > 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated > > org.apache.spark.streaming.dstream.ForEachDStream@d8e872 > > 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at > > time 1470833192000 > > 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at > 1470833192000 > > ms > > 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler > > 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started > > 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties > > 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset > is > > overridden to smallest > > 16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is > overridden > > to xyz > > 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect > is > > overridden to localhost:2181 > > 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 ms > > 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job > > 1470833192000 ms.0 from job set of time 1470833192000 ms > > 16/08/10 18:16:32 INFO SparkContext: Starting job: json at > todelete.scala:42 > > 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at > todelete.scala:42) > > with 2 output partitions > > 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at > > todelete.scala:42) > > 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List() > > 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List() > > 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0 > > (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing > > parents > > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values in > > memory (estimated size 4.7 KB, free 4.7 KB) > > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as > bytes > > in memory (estimated size 2.6 KB, free 7.3 KB) > > 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in > memory > > on localhost:55361 (size: 2.6 KB, free: 511.5 MB) > > 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast > at > > DAGScheduler.scala:1006 > > 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from > > ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42) > > 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 > tasks > > 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 > (TID > > 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) > > 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) > > 16/08/10 18:16:32 INFO Executor: Fetching > > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > with > > timestamp 1470833189531 > > 16/08/10 18:16:32 INFO Utils: Fetching > > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > to > > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles- > e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp > > 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 ms > > 16/08/10 18:16:33 INFO Executor: Adding > > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/ > userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark- > assembly-1.6.2-hadoop2.6.0.jar > > to class loader > > 16/08/10 18:16:33 INFO Executor: Fetching > > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533 > > 16/08/10 18:16:33 INFO Utils: Fetching > > http://192.168.126.131:59491/jars/pain.jar to > > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles- > e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp > > 16/08/10 18:16:33 INFO Executor: Adding > > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/ > userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar > > to class loader > > 16/08/10 18:16:33 INFO Executor: Fetching > > http://192.168.126.131:59491/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > > with timestamp 1470833188094 > > 16/08/10 18:16:33 INFO Utils: Fetching > > http://192.168.126.131:59491/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > > to > > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles- > e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp > > 16/08/10 18:16:33 INFO Executor: Adding > > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/ > userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark- > streaming-kafka-assembly_2.10-1.6.2.jar > > to class loader > > 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 > offsets 0 > > -> 20 > > 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties > > 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset > is > > overridden to smallest > > 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is > overridden > > to xyz > > 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect > is > > overridden to localhost:2181 > > 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms > > 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). > > 13366 bytes result sent to driver > > 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 > (TID > > 0) in 2423 ms on localhost (1/2) > > 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms > > 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms > > 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms > > 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms > > 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms > > 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 ms > > 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms > > 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms > > 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms > > 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms > > 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms > > 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms > > 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms > > 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms > > 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms > > 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 ms > > 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms > > 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms > > 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms > > 16/08/10 18:16:54 INFO JobSch > > > > On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Those logs you're posting are from right after your failure, they don't > >> include what actually went wrong when attempting to read json. Look at > your > >> logs more carefully. > >> > >> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" > >> <diwakar.dhanusk...@gmail.com> wrote: > >>> > >>> Hi Siva, > >>> > >>> With below code, it is stuck up at > >>> sqlContext.read.json(rdd.map(_._2)).toDF() > >>> There are two partitions in topic. > >>> I am running spark 1.6.2 > >>> > >>> val topics = "topic.name" > >>> val brokers = "localhost:9092" > >>> val topicsSet = topics.split(",").toSet > >>> val sparkConf = new > >>> SparkConf().setAppName("KafkaWeather").setMaster(" > local[5]")//spark://localhost:7077 > >>> val sc = new SparkContext(sparkConf) > >>> val ssc = new StreamingContext(sc, Seconds(60)) > >>> val kafkaParams = Map[String, String]("metadata.broker.list" -> > brokers, > >>> "group.id" -> "xyz","auto.offset.reset"->"smallest") > >>> val messages = KafkaUtils.createDirectStream[String, String, > >>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) > >>> messages.foreachRDD(rdd => { > >>> if (rdd.isEmpty()) { > >>> println("Failed to get data from Kafka. Please check that the Kafka > >>> producer is streaming data.") > >>> System.exit(-1) > >>> } > >>> val sqlContext = > >>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) > >>> val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() > >>> dataframe.foreach(println) > >>> > >>> }) > >>> > >>> > >>> Below are logs, > >>> > >>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at > >>> todelete.scala:34) failed in 110.776 s > >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already > >>> stopped! Dropping event > >>> SparkListenerStageCompleted(org.apache.spark.scheduler. > StageInfo@6d8ff688) > >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already > >>> stopped! Dropping event > >>> SparkListenerJobEnd(0,1470812271971,JobFailed(org. > apache.spark.SparkException: > >>> Job 0 cancelled because SparkContext was shut down)) > >>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint: > >>> MapOutputTrackerMasterEndpoint stopped! > >>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared > >>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped > >>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped > >>> 16/08/10 12:27:51 INFO > >>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: > >>> OutputCommitCoordinator stopped! > >>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator: > >>> Shutting down remote daemon. > >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: > Remote > >>> daemon shut down; proceeding with flushing remote transports. > >>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext > >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called > >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory > >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd- > 07b9c1b6-01db-45b5-9302-d2f67f7c490e > >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: > >>> Remoting shut down. > >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory > >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2 > >>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class > >>> com.boa.poc.todelete --jars > >>> /home/cloudera/lib/spark-streaming-kafka-assembly_2.10- > 1.6.2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar > >>> /home/cloudera/Downloads/boa/pain.jar > log.txt > >>> Using Spark's default log4j profile: > >>> org/apache/spark/log4j-defaults.properties > >>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2 > >>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop > >>> library for your platform... using builtin-java classes where > applicable > >>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera > resolves > >>> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on > >>> interface eth1) > >>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > >>> another address > >>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera > >>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: > cloudera > >>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication > >>> disabled; ui acls disabled; users with view permissions: Set(cloudera); > >>> users with modify permissions: Set(cloudera) > >>> 16/08/10 12:28:00 INFO Utils: Successfully started service > 'sparkDriver' > >>> on port 42140. > >>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started > >>> 16/08/10 12:28:01 INFO Remoting: Starting remoting > >>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on > addresses > >>> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328] > >>> 16/08/10 12:28:01 INFO Utils: Successfully started service > >>> 'sparkDriverActorSystem' on port 53328. > >>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker > >>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster > >>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at > >>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2 > >>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity > >>> 511.5 MB > >>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator > >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on > >>> port 4040. > >>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at > >>> http://192.168.126.131:4040 > >>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is > >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd- > 70563ad1-3d30-4a9c-ab11-82ecbb2e71b0 > >>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server > >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file > >>> server' on port 58957. > >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR > >>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar > at > >>> http://192.168.126.131:58957/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > >>> with timestamp 1470812282187 > >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR > >>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at > >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > with > >>> timestamp 1470812282398 > >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR > >>> file:/home/cloudera/Downloads/boa/pain.jar at > >>> http://192.168.126.131:58957/jars/pain.jar with timestamp > 1470812282402 > >>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host > >>> localhost > >>> 16/08/10 12:28:02 INFO Utils: Successfully started service > >>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port > 56716. > >>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on > 56716 > >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register > >>> BlockManager > >>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block > >>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver, > localhost, > >>> 56716) > >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager > >>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties > >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset > >>> is overridden to smallest > >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is > >>> overridden to xyz > >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect > >>> is overridden to > >>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1 > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay = > -1 > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level = > >>> StorageLevel(false, false, false, false, 1) > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval = > >>> null > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = > 60000 > >>> ms > >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and > validated > >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5 > >>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms > >>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level = > >>> StorageLevel(false, false, false, false, 1) > >>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null > >>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms > >>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated > >>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143 > >>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator > at > >>> time 1470812340000 > >>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at > >>> 1470812340000 ms > >>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler > >>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started > >>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties > >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset > >>> is overridden to smallest > >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is > >>> overridden to xyz > >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect > >>> is overridden to > >>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000 > ms > >>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job > >>> 1470812340000 ms.0 from job set of time 1470812340000 ms > >>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at > >>> todelete.scala:34 > >>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at > >>> todelete.scala:34) with 2 output partitions > >>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json > at > >>> todelete.scala:34) > >>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List() > >>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List() > >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0 > >>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no > missing > >>> parents > >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values > in > >>> memory (estimated size 4.7 KB, free 4.7 KB) > >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as > >>> bytes in memory (estimated size 2.6 KB, free 7.2 KB) > >>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in > >>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB) > >>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast > >>> at DAGScheduler.scala:1006 > >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from > >>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34) > >>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 > >>> tasks > >>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 > >>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes) > >>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0) > >>> 16/08/10 12:29:01 INFO Executor: Fetching > >>> http://192.168.126.131:58957/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > >>> with timestamp 1470812282187 > >>> 16/08/10 12:29:01 INFO Utils: Fetching > >>> http://192.168.126.131:58957/jars/spark-streaming-kafka- > assembly_2.10-1.6.2.jar > >>> to > >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles- > 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp > >>> 16/08/10 12:29:01 INFO Executor: Adding > >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/ > userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark- > streaming-kafka-assembly_2.10-1.6.2.jar > >>> to class loader > >>> 16/08/10 12:29:01 INFO Executor: Fetching > >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > with > >>> timestamp 1470812282398 > >>> 16/08/10 12:29:01 INFO Utils: Fetching > >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar > to > >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles- > 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp > >>> 16/08/10 12:29:01 INFO Executor: Adding > >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/ > userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark- > assembly-1.6.2-hadoop2.6.0.jar > >>> to class loader > >>> 16/08/10 12:29:01 INFO Executor: Fetching > >>> http://192.168.126.131:58957/jars/pain.jar with timestamp > 1470812282402 > >>> 16/08/10 12:29:01 INFO Utils: Fetching > >>> http://192.168.126.131:58957/jars/pain.jar to > >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles- > 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp > >>> 16/08/10 12:29:02 INFO Executor: Adding > >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/ > userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar > >>> to class loader > >>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, > partition 0 > >>> offsets 0 -> 8 > >>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties > >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset > >>> is overridden to smallest > >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is > >>> overridden to xyz > >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect > >>> is overridden to > >>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID > 0). > >>> 13366 bytes result sent to driver > >>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 > >>> (TID 0) in 2380 ms on localhost (1/2) > >>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000 > ms > >>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000 > ms > >>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000 > ms > >>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000 > ms > >>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000 > ms > >>> > >>> > >>> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi > >>> <diwakar.dhanusk...@gmail.com> wrote: > >>>> > >>>> Hi Siva, > >>>> > >>>> Does topic has partitions? which version of Spark you are using? > >>>> > >>>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> > >>>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> Here is a working example I did. > >>>>> > >>>>> HTH > >>>>> > >>>>> Regards, > >>>>> > >>>>> Sivakumaran S > >>>>> > >>>>> val topics = "test" > >>>>> val brokers = "localhost:9092" > >>>>> val topicsSet = topics.split(",").toSet > >>>>> val sparkConf = new > >>>>> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") > >>>>> //spark://localhost:7077 > >>>>> val sc = new SparkContext(sparkConf) > >>>>> val ssc = new StreamingContext(sc, Seconds(60)) > >>>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> > >>>>> brokers) > >>>>> val messages = KafkaUtils.createDirectStream[String, String, > >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) > >>>>> messages.foreachRDD(rdd => { > >>>>> if (rdd.isEmpty()) { > >>>>> println("Failed to get data from Kafka. Please check that the > Kafka > >>>>> producer is streaming data.") > >>>>> System.exit(-1) > >>>>> } > >>>>> val sqlContext = > >>>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) > >>>>> val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() > >>>>> //Process your DF as required here on > >>>>> } > >>>>> > >>>>> > >>>>> > >>>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi > >>>>> <diwakar.dhanusk...@gmail.com> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> I am reading json messages from kafka . Topics has 2 partitions. When > >>>>> running streaming job using spark-submit, I could see that val > dataFrame = > >>>>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I > doing > >>>>> something wrong here. Below is code .This environment is cloudera > sandbox > >>>>> env. Same issue in hadoop production cluster mode except that it is > >>>>> restricted thats why tried to reproduce issue in Cloudera sandbox. > Kafka > >>>>> 0.10 and Spark 1.4. > >>>>> > >>>>> val kafkaParams = > >>>>> Map[String,String]("bootstrap.servers"->"localhost:9093, > localhost:9092", > >>>>> "group.id" -> "xyz","auto.offset.reset"->"smallest") > >>>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > >>>>> val ssc = new StreamingContext(conf, Seconds(1)) > >>>>> > >>>>> val sqlContext = new org.apache.spark.sql. > SQLContext(ssc.sparkContext) > >>>>> > >>>>> val topics = Set("gpp.minf") > >>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String, > >>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics) > >>>>> > >>>>> kafkaStream.foreachRDD( > >>>>> rdd => { > >>>>> if (rdd.count > 0){ > >>>>> val dataFrame = sqlContext.read.json(rdd.map(_._2)) > >>>>> dataFrame.printSchema() > >>>>> //dataFrame.foreach(println) > >>>>> } > >>>>> } > >>>>> > >>>>> > >>>> > >>> > > >