Checked executor logs and UI . There is no error message or something like
that.  when there is any action , it is  waiting .
There are data in partitions. I could use simple-consumer-shell and print
all data in console.  Am I doing anything wrong in foreachRDD?.
This just works fine  with single partitioned topic,

On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> wrote:

> zookeeper.connect is irrelevant.
>
> Did you look at your executor logs?
> Did you look at the UI for the (probably failed) stages?
> Are you actually producing data into all of the kafka partitions?
> If you use kafka-simple-consumer-shell.sh to read that partition, do
> you get any data?
>
> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
> > Hi Cody,
> >
> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
> batch
> > window. Other batches are queued. I could see foreach(println) of
> dataFrame
> > printing one of partition's data and not the other.
> > Couldn't see any  errors from log.
> >
> > val brokers = "localhost:9092,localhost:9093"
> > val sparkConf = new
> > SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> > val sc = new SparkContext(sparkConf)
> > val ssc = new StreamingContext(sc, Seconds(1))
> > val kafkaParams = Map[String,
> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
> group.id"->"xyz")
> > val topics = "test"
> > val topicsSet = topics.split(",").toSet
> > val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topicsSet)
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> > import sqlContext.implicits._
> > messages.foreachRDD(rdd => {
> >   if (rdd.isEmpty()) {
> >     println("Failed to get data from Kafka. Please check that the Kafka
> > producer is streaming data.")
> >     System.exit(-1)
> >   }
> >
> >    val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
> >   dataframe.foreach(println)
> >  println( "$$$$$$$$$$$", dataframe.count())
> >               })
> > Logs:
> >
> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
> resolves to
> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface
> > eth1)
> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> > another address
> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
> > users with modify permissions: Set(cloudera)
> > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver'
> on
> > port 45031.
> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> > 16/08/10 18:16:26 INFO Utils: Successfully started service
> > 'sparkDriverActorSystem' on port 56638.
> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
> 511.5
> > MB
> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> > http://192.168.126.131:4040
> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-
> 2b2a4e68-2952-41b0-a11b-f07860682749
> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> > server' on port 59491.
> > 16/08/10 18:16:28 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> > http://192.168.126.131:59491/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> > with timestamp 1470833188094
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> > timestamp 1470833189531
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > file:/home/cloudera/Downloads/boa/pain.jar at
> > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> > 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
> > localhost
> > 16/08/10 18:16:29 INFO Utils: Successfully started service
> > 'org.apache.spark.network.netty.NettyBlockTransferService' on port
> 55361.
> > 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
> > 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register
> BlockManager
> > 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block
> manager
> > localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost,
> 55361)
> > 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
> > 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
> > 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset
> is
> > overridden to smallest
> > 16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is
> overridden
> > to xyz
> > 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect
> is
> > overridden to localhost:2181
> > 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level =
> > StorageLevel(false, false, false, false, 1)
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval =
> null
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = 1000
> ms
> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and validated
> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff
> > 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms
> > 16/08/10 18:16:31 INFO ForEachDStream: Storage level =
> StorageLevel(false,
> > false, false, false, 1)
> > 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null
> > 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms
> > 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated
> > org.apache.spark.streaming.dstream.ForEachDStream@d8e872
> > 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at
> > time 1470833192000
> > 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at
> 1470833192000
> > ms
> > 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler
> > 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started
> > 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties
> > 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset
> is
> > overridden to smallest
> > 16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is
> overridden
> > to xyz
> > 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect
> is
> > overridden to localhost:2181
> > 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 ms
> > 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job
> > 1470833192000 ms.0 from job set of time 1470833192000 ms
> > 16/08/10 18:16:32 INFO SparkContext: Starting job: json at
> todelete.scala:42
> > 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at
> todelete.scala:42)
> > with 2 output partitions
> > 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at
> > todelete.scala:42)
> > 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List()
> > 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List()
> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0
> > (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing
> > parents
> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values in
> > memory (estimated size 4.7 KB, free 4.7 KB)
> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes
> > in memory (estimated size 2.6 KB, free 7.3 KB)
> > 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory
> > on localhost:55361 (size: 2.6 KB, free: 511.5 MB)
> > 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast
> at
> > DAGScheduler.scala:1006
> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from
> > ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42)
> > 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
> tasks
> > 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0
> (TID
> > 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
> > 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
> > 16/08/10 18:16:32 INFO Executor: Fetching
> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> > timestamp 1470833189531
> > 16/08/10 18:16:32 INFO Utils: Fetching
> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> to
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-
> e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp
> > 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 ms
> > 16/08/10 18:16:33 INFO Executor: Adding
> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/
> userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-
> assembly-1.6.2-hadoop2.6.0.jar
> > to class loader
> > 16/08/10 18:16:33 INFO Executor: Fetching
> > http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> > 16/08/10 18:16:33 INFO Utils: Fetching
> > http://192.168.126.131:59491/jars/pain.jar to
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-
> e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp
> > 16/08/10 18:16:33 INFO Executor: Adding
> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/
> userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar
> > to class loader
> > 16/08/10 18:16:33 INFO Executor: Fetching
> > http://192.168.126.131:59491/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> > with timestamp 1470833188094
> > 16/08/10 18:16:33 INFO Utils: Fetching
> > http://192.168.126.131:59491/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> > to
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-
> e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp
> > 16/08/10 18:16:33 INFO Executor: Adding
> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/
> userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-
> streaming-kafka-assembly_2.10-1.6.2.jar
> > to class loader
> > 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0
> offsets 0
> > -> 20
> > 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties
> > 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset
> is
> > overridden to smallest
> > 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is
> overridden
> > to xyz
> > 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect
> is
> > overridden to localhost:2181
> > 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms
> > 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
> > 13366 bytes result sent to driver
> > 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0
> (TID
> > 0) in 2423 ms on localhost (1/2)
> > 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms
> > 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms
> > 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms
> > 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms
> > 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms
> > 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 ms
> > 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms
> > 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms
> > 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms
> > 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms
> > 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms
> > 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms
> > 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms
> > 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms
> > 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms
> > 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 ms
> > 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms
> > 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms
> > 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms
> > 16/08/10 18:16:54 INFO JobSch
> >
> > On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Those logs you're posting are from right after your failure, they don't
> >> include what actually went wrong when attempting to read json. Look at
> your
> >> logs more carefully.
> >>
> >> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi"
> >> <diwakar.dhanusk...@gmail.com> wrote:
> >>>
> >>> Hi Siva,
> >>>
> >>> With below code, it is stuck up at
> >>> sqlContext.read.json(rdd.map(_._2)).toDF()
> >>> There are two partitions in  topic.
> >>> I am running spark 1.6.2
> >>>
> >>> val topics = "topic.name"
> >>> val brokers = "localhost:9092"
> >>> val topicsSet = topics.split(",").toSet
> >>> val sparkConf = new
> >>> SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> >>> val sc = new SparkContext(sparkConf)
> >>> val ssc = new StreamingContext(sc, Seconds(60))
> >>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers,
> >>> "group.id" -> "xyz","auto.offset.reset"->"smallest")
> >>> val messages = KafkaUtils.createDirectStream[String, String,
> >>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> >>> messages.foreachRDD(rdd => {
> >>>   if (rdd.isEmpty()) {
> >>>     println("Failed to get data from Kafka. Please check that the Kafka
> >>> producer is streaming data.")
> >>>     System.exit(-1)
> >>>   }
> >>>   val sqlContext =
> >>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
> >>>   val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
> >>>   dataframe.foreach(println)
> >>>
> >>>               })
> >>>
> >>>
> >>> Below are logs,
> >>>
> >>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
> >>> todelete.scala:34) failed in 110.776 s
> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> >>> stopped! Dropping event
> >>> SparkListenerStageCompleted(org.apache.spark.scheduler.
> StageInfo@6d8ff688)
> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> >>> stopped! Dropping event
> >>> SparkListenerJobEnd(0,1470812271971,JobFailed(org.
> apache.spark.SparkException:
> >>> Job 0 cancelled because SparkContext was shut down))
> >>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
> >>> MapOutputTrackerMasterEndpoint stopped!
> >>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
> >>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
> >>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
> >>> 16/08/10 12:27:51 INFO
> >>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> >>> OutputCommitCoordinator stopped!
> >>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
> >>> Shutting down remote daemon.
> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
> Remote
> >>> daemon shut down; proceeding with flushing remote transports.
> >>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-
> 07b9c1b6-01db-45b5-9302-d2f67f7c490e
> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
> >>> Remoting shut down.
> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
> >>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
> >>> com.boa.poc.todelete --jars
> >>> /home/cloudera/lib/spark-streaming-kafka-assembly_2.10-
> 1.6.2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar
> >>> /home/cloudera/Downloads/boa/pain.jar > log.txt
> >>> Using Spark's default log4j profile:
> >>> org/apache/spark/log4j-defaults.properties
> >>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
> >>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
> >>> library for your platform... using builtin-java classes where
> applicable
> >>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera
> resolves
> >>> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> >>> interface eth1)
> >>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> >>> another address
> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to:
> cloudera
> >>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
> >>> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> >>> users with modify permissions: Set(cloudera)
> >>> 16/08/10 12:28:00 INFO Utils: Successfully started service
> 'sparkDriver'
> >>> on port 42140.
> >>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
> >>> 16/08/10 12:28:01 INFO Remoting: Starting remoting
> >>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on
> addresses
> >>> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
> >>> 16/08/10 12:28:01 INFO Utils: Successfully started service
> >>> 'sparkDriverActorSystem' on port 53328.
> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
> >>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
> >>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
> >>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity
> >>> 511.5 MB
> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
> >>> port 4040.
> >>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
> >>> http://192.168.126.131:4040
> >>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-
> 70563ad1-3d30-4a9c-ab11-82ecbb2e71b0
> >>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server
> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file
> >>> server' on port 58957.
> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> >>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> at
> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> >>> with timestamp 1470812282187
> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> >>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> >>> timestamp 1470812282398
> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> >>> file:/home/cloudera/Downloads/boa/pain.jar at
> >>> http://192.168.126.131:58957/jars/pain.jar with timestamp
> 1470812282402
> >>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host
> >>> localhost
> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service
> >>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port
> 56716.
> >>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on
> 56716
> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register
> >>> BlockManager
> >>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block
> >>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver,
> localhost,
> >>> 56716)
> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager
> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties
> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset
> >>> is overridden to smallest
> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is
> >>> overridden to xyz
> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect
> >>> is overridden to
> >>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay =
> -1
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level =
> >>> StorageLevel(false, false, false, false, 1)
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval =
> >>> null
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration =
> 60000
> >>> ms
> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and
> validated
> >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5
> >>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms
> >>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level =
> >>> StorageLevel(false, false, false, false, 1)
> >>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null
> >>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms
> >>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated
> >>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143
> >>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator
> at
> >>> time 1470812340000
> >>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at
> >>> 1470812340000 ms
> >>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler
> >>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started
> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties
> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset
> >>> is overridden to smallest
> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is
> >>> overridden to xyz
> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect
> >>> is overridden to
> >>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000
> ms
> >>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job
> >>> 1470812340000 ms.0 from job set of time 1470812340000 ms
> >>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at
> >>> todelete.scala:34
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at
> >>> todelete.scala:34) with 2 output partitions
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json
> at
> >>> todelete.scala:34)
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List()
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List()
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0
> >>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no
> missing
> >>> parents
> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values
> in
> >>> memory (estimated size 4.7 KB, free 4.7 KB)
> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as
> >>> bytes in memory (estimated size 2.6 KB, free 7.2 KB)
> >>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> >>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB)
> >>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast
> >>> at DAGScheduler.scala:1006
> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from
> >>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34)
> >>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
> >>> tasks
> >>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0
> >>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
> >>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
> >>> 16/08/10 12:29:01 INFO Executor: Fetching
> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> >>> with timestamp 1470812282187
> >>> 16/08/10 12:29:01 INFO Utils: Fetching
> >>> http://192.168.126.131:58957/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> >>> to
> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-
> 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
> >>> 16/08/10 12:29:01 INFO Executor: Adding
> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/
> userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-
> streaming-kafka-assembly_2.10-1.6.2.jar
> >>> to class loader
> >>> 16/08/10 12:29:01 INFO Executor: Fetching
> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> >>> timestamp 1470812282398
> >>> 16/08/10 12:29:01 INFO Utils: Fetching
> >>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> to
> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-
> 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
> >>> 16/08/10 12:29:01 INFO Executor: Adding
> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/
> userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-
> assembly-1.6.2-hadoop2.6.0.jar
> >>> to class loader
> >>> 16/08/10 12:29:01 INFO Executor: Fetching
> >>> http://192.168.126.131:58957/jars/pain.jar with timestamp
> 1470812282402
> >>> 16/08/10 12:29:01 INFO Utils: Fetching
> >>> http://192.168.126.131:58957/jars/pain.jar to
> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-
> 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp
> >>> 16/08/10 12:29:02 INFO Executor: Adding
> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/
> userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
> >>> to class loader
> >>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name,
> partition 0
> >>> offsets 0 -> 8
> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset
> >>> is overridden to smallest
> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is
> >>> overridden to xyz
> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect
> >>> is overridden to
> >>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID
> 0).
> >>> 13366 bytes result sent to driver
> >>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0
> >>> (TID 0) in 2380 ms on localhost (1/2)
> >>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000
> ms
> >>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000
> ms
> >>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000
> ms
> >>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000
> ms
> >>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000
> ms
> >>>
> >>>
> >>> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi
> >>> <diwakar.dhanusk...@gmail.com> wrote:
> >>>>
> >>>> Hi Siva,
> >>>>
> >>>> Does topic  has partitions? which version of Spark you are using?
> >>>>
> >>>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> Here is a working example I did.
> >>>>>
> >>>>> HTH
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Sivakumaran S
> >>>>>
> >>>>> val topics = "test"
> >>>>> val brokers = "localhost:9092"
> >>>>> val topicsSet = topics.split(",").toSet
> >>>>> val sparkConf = new
> >>>>> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
> >>>>> //spark://localhost:7077
> >>>>> val sc = new SparkContext(sparkConf)
> >>>>> val ssc = new StreamingContext(sc, Seconds(60))
> >>>>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> >>>>> brokers)
> >>>>> val messages = KafkaUtils.createDirectStream[String, String,
> >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> >>>>> messages.foreachRDD(rdd => {
> >>>>>   if (rdd.isEmpty()) {
> >>>>>     println("Failed to get data from Kafka. Please check that the
> Kafka
> >>>>> producer is streaming data.")
> >>>>>     System.exit(-1)
> >>>>>   }
> >>>>>   val sqlContext =
> >>>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
> >>>>>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
> >>>>>   //Process your DF as required here on
> >>>>> }
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi
> >>>>> <diwakar.dhanusk...@gmail.com> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I am reading json messages from kafka . Topics has 2 partitions. When
> >>>>> running streaming job using spark-submit, I could see that  val
> dataFrame =
> >>>>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I
> doing
> >>>>> something wrong here. Below is code .This environment is cloudera
> sandbox
> >>>>> env. Same issue in hadoop production cluster mode except that it is
> >>>>> restricted thats why tried to reproduce issue in Cloudera sandbox.
> Kafka
> >>>>> 0.10 and  Spark 1.4.
> >>>>>
> >>>>> val kafkaParams =
> >>>>> Map[String,String]("bootstrap.servers"->"localhost:9093,
> localhost:9092",
> >>>>> "group.id" -> "xyz","auto.offset.reset"->"smallest")
> >>>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> >>>>> val ssc = new StreamingContext(conf, Seconds(1))
> >>>>>
> >>>>> val sqlContext = new org.apache.spark.sql.
> SQLContext(ssc.sparkContext)
> >>>>>
> >>>>> val topics = Set("gpp.minf")
> >>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> >>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >>>>>
> >>>>> kafkaStream.foreachRDD(
> >>>>>   rdd => {
> >>>>>     if (rdd.count > 0){
> >>>>>         val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >>>>>        dataFrame.printSchema()
> >>>>> //dataFrame.foreach(println)
> >>>>> }
> >>>>> }
> >>>>>
> >>>>>
> >>>>
> >>>
> >
>

Reply via email to