Figured it out. All I am doing wrong is testing it out in pseudo node vm
with 1 core. The tasks were hanging out for cpu.
In production cluster this works just fine.

On Thu, Aug 11, 2016 at 12:45 AM, Diwakar Dhanuskodi wrote:

> Checked executor logs and UI . There is no error message or something like
> that.  when there is any action , it is  waiting .
> There are data in partitions. I could use simple-consumer-shell and print
> all data in console.  Am I doing anything wrong in foreachRDD?.
> This just works fine  with single partitioned topic,
On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger wrote:
> wrote:
>> zookeeper.connect is irrelevant.
>> Did you look at your executor logs?
>> Did you look at the UI for the (probably failed) stages?
>> Are you actually producing data into all of the kafka partitions?
>> If you use to read that partition, do
>> you get any data?
On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi wrote:
>> <> wrote:
>> > Hi Cody,
>> >
>> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
>> batch
>> > window. Other batches are queued. I could see foreach(println) of
>> dataFrame
>> > printing one of partition's data and not the other.
>> > Couldn't see any  errors from log.
>> >
>> > val brokers = "localhost:9092,localhost:9093"
>> > val sparkConf = new
>> > SparkConf().setAppName("KafkaWeather").setMaster("local[5]")
>> //spark://localhost:7077
>> > val sc = new SparkContext(sparkConf)
>> > val ssc = new StreamingContext(sc, Seconds(1))
>> > val kafkaParams = Map[String,
>> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
>> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
>> > val topics = "test"
>> > val topicsSet = topics.split(",").toSet
>> > val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topicsSet)
>> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> > import sqlContext.implicits._
>> > messages.foreachRDD(rdd => {
>> >   if (rdd.isEmpty()) {
>> >     println("Failed to get data from Kafka. Please check that the Kafka
>> > producer is streaming data.")
>> >     System.exit(-1)
>> >   }
>> >
>> >    val dataframe =
>> >   dataframe.foreach(println)
>> >  println( "$$$$$$$$$$$", dataframe.count())
>> >               })
>> > Logs:
>> >
>> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
>> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where applicable
>> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
>> resolves to
>> > a loopback address:; using instead (on
>> interface
>> > eth1)
>> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> > another address
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to:
>> cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
>> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
>> > users with modify permissions: Set(cloudera)
>> > 16/08/10 18:16:25 INFO Utils: Successfully started service
>> 'sparkDriver' on
>> > port 45031.
>> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
>> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
>> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on
>> addresses
>> > :[akka.tcp://sparkDriverActorSystem@]
>> > 16/08/10 18:16:26 INFO Utils: Successfully started service
>> > 'sparkDriverActorSystem' on port 56638.
>> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
>> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
>> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
>> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
>> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
>> 511.5
>> > MB
>> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
>> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
>> port
>> > 4040.
>> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
>> >
>> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
>> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e
>> 68-2952-41b0-a11b-f07860682749
>> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
>> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
>> > server' on port 59491.
>> > 16/08/10 18:16:28 INFO SparkContext: Added JAR
>> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>> at
>> >
>> mbly_2.10-1.6.2.jar
>> > with timestamp 1470833188094
>> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
>> > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
>> >
>> with
>> > timestamp 1470833189531
>> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
>> > file:/home/cloudera/Downloads/boa/pain.jar at
>> > with timestamp 1470833189533
>> > 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
>> > localhost
>> > 16/08/10 18:16:29 INFO Utils: Successfully started service
>> > '' on port
>> 55361.
>> > 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on
>> 55361
>> > 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register
>> BlockManager
>> > 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block
>> manager
>> > localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost,
>> 55361)
>> > 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
>> > 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
>> > 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset
>> is
>> > overridden to smallest
>> > 16/08/10 18:16:30 INFO VerifiableProperties: Property is
>> overridden
>> > to xyz
>> > 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect
>> is
>> > overridden to localhost:2181
>> > 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay =
>> -1
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level =
>> > StorageLevel(false, false, false, false, 1)
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval =
>> null
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration =
>> 1000 ms
>> > 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and
>> validated
>> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff
>> > 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms
>> > 16/08/10 18:16:31 INFO ForEachDStream: Storage level =
>> StorageLevel(false,
>> > false, false, false, 1)
>> > 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null
>> > 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms
>> > 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated
>> > org.apache.spark.streaming.dstream.ForEachDStream@d8e872
>> > 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at
>> > time 1470833192000
>> > 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at
>> 1470833192000
>> > ms
>> > 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler
>> > 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started
>> > 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties
>> > 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset
>> is
>> > overridden to smallest
>> > 16/08/10 18:16:32 INFO VerifiableProperties: Property is
>> overridden
>> > to xyz
>> > 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect
>> is
>> > overridden to localhost:2181
>> > 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000
>> ms
>> > 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job
>> > 1470833192000 ms.0 from job set of time 1470833192000 ms
>> > 16/08/10 18:16:32 INFO SparkContext: Starting job: json at
>> todelete.scala:42
>> > 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at
>> todelete.scala:42)
>> > with 2 output partitions
>> > 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at
>> > todelete.scala:42)
>> > 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List()
>> > 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List()
>> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0
>> > (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing
>> > parents
>> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values
>> in
>> > memory (estimated size 4.7 KB, free 4.7 KB)
>> > 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as
>> bytes
>> > in memory (estimated size 2.6 KB, free 7.3 KB)
>> > 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory
>> > on localhost:55361 (size: 2.6 KB, free: 511.5 MB)
>> > 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast
>> at
>> > DAGScheduler.scala:1006
>> > 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from
>> > ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42)
>> > 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
>> tasks
>> > 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>> (TID
>> > 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
>> > 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
>> > 16/08/10 18:16:32 INFO Executor: Fetching
>> >
>> with
>> > timestamp 1470833189531
>> > 16/08/10 18:16:32 INFO Utils: Fetching
>> >
>> to
>> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3
>> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp
>> > 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000
>> ms
>> > 16/08/10 18:16:33 INFO Executor: Adding
>> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil
>> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-assembly-1.
>> 6.2-hadoop2.6.0.jar
>> > to class loader
>> > 16/08/10 18:16:33 INFO Executor: Fetching
>> > with timestamp 1470833189533
>> > 16/08/10 18:16:33 INFO Utils: Fetching
>> > to
>> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3
>> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp
>> > 16/08/10 18:16:33 INFO Executor: Adding
>> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil
>> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar
>> > to class loader
>> > 16/08/10 18:16:33 INFO Executor: Fetching
>> >
>> mbly_2.10-1.6.2.jar
>> > with timestamp 1470833188094
>> > 16/08/10 18:16:33 INFO Utils: Fetching
>> >
>> mbly_2.10-1.6.2.jar
>> > to
>> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3
>> b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp
>> > 16/08/10 18:16:33 INFO Executor: Adding
>> > file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFil
>> es-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-
>> kafka-assembly_2.10-1.6.2.jar
>> > to class loader
>> > 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0
>> offsets 0
>> > -> 20
>> > 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties
>> > 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset
>> is
>> > overridden to smallest
>> > 16/08/10 18:16:33 INFO VerifiableProperties: Property is
>> overridden
>> > to xyz
>> > 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect
>> is
>> > overridden to localhost:2181
>> > 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000
>> ms
>> > 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
>> > 13366 bytes result sent to driver
>> > 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>> (TID
>> > 0) in 2423 ms on localhost (1/2)
>> > 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000
>> ms
>> > 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000
>> ms
>> > 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000
>> ms
>> > 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000
>> ms
>> > 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000
>> ms
>> > 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000
>> ms
>> > 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000
>> ms
>> > 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000
>> ms
>> > 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000
>> ms
>> > 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000
>> ms
>> > 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000
>> ms
>> > 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000
>> ms
>> > 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000
>> ms
>> > 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000
>> ms
>> > 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000
>> ms
>> > 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000
>> ms
>> > 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000
>> ms
>> > 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000
>> ms
>> > 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000
>> ms
>> > 16/08/10 18:16:54 INFO JobSch
>> >
On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger wrote:
>> wrote:
>> >>
>> >> Those logs you're posting are from right after your failure, they don't
>> >> include what actually went wrong when attempting to read json. Look at
>> your
>> >> logs more carefully.
>> >>
On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" wrote:
>> >> <> wrote:
>> >>>
>> >>> Hi Siva,
>> >>>
>> >>> With below code, it is stuck up at
>> >>>
>> >>> There are two partitions in  topic.
>> >>> I am running spark 1.6.2
>> >>>
>> >>> val topics = ""
>> >>> val brokers = "localhost:9092"
>> >>> val topicsSet = topics.split(",").toSet
>> >>> val sparkConf = new
>> >>> SparkConf().setAppName("KafkaWeather").setMaster("local[5]")
>> //spark://localhost:7077
>> >>> val sc = new SparkContext(sparkConf)
>> >>> val ssc = new StreamingContext(sc, Seconds(60))
>> >>> val kafkaParams = Map[String, String]("" ->
>> brokers,
>> >>> "" -> "xyz","auto.offset.reset"->"smallest")
>> >>> val messages = KafkaUtils.createDirectStream[String, String,
>> >>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> >>> messages.foreachRDD(rdd => {
>> >>>   if (rdd.isEmpty()) {
>> >>>     println("Failed to get data from Kafka. Please check that the
>> Kafka
>> >>> producer is streaming data.")
>> >>>     System.exit(-1)
>> >>>   }
>> >>>   val sqlContext =
>> >>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>> >>>   val dataframe =
>> >>>   dataframe.foreach(println)
>> >>>
>> >>>               })
>> >>>
>> >>>
>> >>> Below are logs,
>> >>>
>> >>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
>> >>> todelete.scala:34) failed in 110.776 s
>> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> >>> stopped! Dropping event
>> >>> SparkListenerStageCompleted(org.apache.spark.scheduler.Stage
>> Info@6d8ff688)
>> >>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> >>> stopped! Dropping event
>> >>> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.
>> spark.SparkException:
>> >>> Job 0 cancelled because SparkContext was shut down))
>> >>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
>> >>> MapOutputTrackerMasterEndpoint stopped!
>> >>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
>> >>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
>> >>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
>> >>> 16/08/10 12:27:51 INFO
>> >>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
>> >>> OutputCommitCoordinator stopped!
>> >>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
>> >>> Shutting down remote daemon.
>> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
>> Remote
>> >>> daemon shut down; proceeding with flushing remote transports.
>> >>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
>> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
>> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1
>> b6-01db-45b5-9302-d2f67f7c490e
>> >>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
>> >>> Remoting shut down.
>> >>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>> >>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
>> >>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
>> >>> com.boa.poc.todelete --jars
>> >>> /home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.
>> 2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar
>> >>> /home/cloudera/Downloads/boa/pain.jar > log.txt
>> >>> Using Spark's default log4j profile:
>> >>> org/apache/spark/
>> >>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
>> >>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
>> >>> library for your platform... using builtin-java classes where
>> applicable
>> >>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera
>> resolves
>> >>> to a loopback address:; using instead (on
>> >>> interface eth1)
>> >>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
>> to
>> >>> another address
>> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to:
>> cloudera
>> >>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to:
>> cloudera
>> >>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager:
>> authentication
>> >>> disabled; ui acls disabled; users with view permissions:
>> Set(cloudera);
>> >>> users with modify permissions: Set(cloudera)
>> >>> 16/08/10 12:28:00 INFO Utils: Successfully started service
>> 'sparkDriver'
>> >>> on port 42140.
>> >>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
>> >>> 16/08/10 12:28:01 INFO Remoting: Starting remoting
>> >>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on
>> addresses
>> >>> :[akka.tcp://sparkDriverActorSystem@]
>> >>> 16/08/10 12:28:01 INFO Utils: Successfully started service
>> >>> 'sparkDriverActorSystem' on port 53328.
>> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
>> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
>> >>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
>> >>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
>> >>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity
>> >>> 511.5 MB
>> >>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
>> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI'
>> on
>> >>> port 4040.
>> >>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
>> >>>
>> >>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
>> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563a
>> d1-3d30-4a9c-ab11-82ecbb2e71b0
>> >>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server
>> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file
>> >>> server' on port 58957.
>> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> >>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>> at
>> >>>
>> mbly_2.10-1.6.2.jar
>> >>> with timestamp 1470812282187
>> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> >>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
>> >>>
>> p2.6.0.jar with
>> >>> timestamp 1470812282398
>> >>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> >>> file:/home/cloudera/Downloads/boa/pain.jar at
>> >>> with timestamp
>> 1470812282402
>> >>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host
>> >>> localhost
>> >>> 16/08/10 12:28:02 INFO Utils: Successfully started service
>> >>> '' on port
>> 56716.
>> >>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on
>> 56716
>> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register
>> >>> BlockManager
>> >>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block
>> >>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver,
>> localhost,
>> >>> 56716)
>> >>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager
>> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties
>> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property
>> auto.offset.reset
>> >>> is overridden to smallest
>> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property is
>> >>> overridden to xyz
>> >>> 16/08/10 12:28:03 INFO VerifiableProperties: Property
>> zookeeper.connect
>> >>> is overridden to
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay
>> = -1
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level =
>> >>> StorageLevel(false, false, false, false, 1)
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval =
>> >>> null
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration =
>> 60000
>> >>> ms
>> >>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and
>> validated
>> >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level =
>> >>> StorageLevel(false, false, false, false, 1)
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms
>> >>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated
>> >>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143
>> >>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator
>> at
>> >>> time 1470812340000
>> >>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at
>> >>> 1470812340000 ms
>> >>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler
>> >>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started
>> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties
>> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property
>> auto.offset.reset
>> >>> is overridden to smallest
>> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property is
>> >>> overridden to xyz
>> >>> 16/08/10 12:29:00 INFO VerifiableProperties: Property
>> zookeeper.connect
>> >>> is overridden to
>> >>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time
>> 1470812340000 ms
>> >>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job
>> >>> 1470812340000 ms.0 from job set of time 1470812340000 ms
>> >>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at
>> >>> todelete.scala:34
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at
>> >>> todelete.scala:34) with 2 output partitions
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json
>> at
>> >>> todelete.scala:34)
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List()
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List()
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0
>> >>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no
>> missing
>> >>> parents
>> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as
>> values in
>> >>> memory (estimated size 4.7 KB, free 4.7 KB)
>> >>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as
>> >>> bytes in memory (estimated size 2.6 KB, free 7.2 KB)
>> >>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> >>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB)
>> >>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from
>> broadcast
>> >>> at DAGScheduler.scala:1006
>> >>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from
>> >>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34)
>> >>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
>> >>> tasks
>> >>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>> >>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
>> >>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
>> >>> 16/08/10 12:29:01 INFO Executor: Fetching
>> >>>
>> mbly_2.10-1.6.2.jar
>> >>> with timestamp 1470812282187
>> >>> 16/08/10 12:29:01 INFO Utils: Fetching
>> >>>
>> mbly_2.10-1.6.2.jar
>> >>> to
>> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56
>> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
>> >>> 16/08/10 12:29:01 INFO Executor: Adding
>> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil
>> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-streaming-
>> kafka-assembly_2.10-1.6.2.jar
>> >>> to class loader
>> >>> 16/08/10 12:29:01 INFO Executor: Fetching
>> >>>
>> p2.6.0.jar with
>> >>> timestamp 1470812282398
>> >>> 16/08/10 12:29:01 INFO Utils: Fetching
>> >>>
>> p2.6.0.jar to
>> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56
>> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
>> >>> 16/08/10 12:29:01 INFO Executor: Adding
>> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil
>> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-assembly-1.
>> 6.2-hadoop2.6.0.jar
>> >>> to class loader
>> >>> 16/08/10 12:29:01 INFO Executor: Fetching
>> >>> with timestamp
>> 1470812282402
>> >>> 16/08/10 12:29:01 INFO Utils: Fetching
>> >>> to
>> >>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56
>> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp
>> >>> 16/08/10 12:29:02 INFO Executor: Adding
>> >>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFil
>> es-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
>> >>> to class loader
>> >>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic,
>> partition 0
>> >>> offsets 0 -> 8
>> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
>> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property
>> auto.offset.reset
>> >>> is overridden to smallest
>> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property is
>> >>> overridden to xyz
>> >>> 16/08/10 12:29:02 INFO VerifiableProperties: Property
>> zookeeper.connect
>> >>> is overridden to
>> >>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID
>> 0).
>> >>> 13366 bytes result sent to driver
>> >>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>> >>> (TID 0) in 2380 ms on localhost (1/2)
>> >>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time
>> 1470812400000 ms
>> >>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time
>> 1470812460000 ms
>> >>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time
>> 1470812520000 ms
>> >>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time
>> 1470812580000 ms
>> >>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time
>> 1470812640000 ms
>> >>>
>> >>>
On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi wrote:
>> >>> <> wrote:
>> >>>>
>> >>>> Hi Siva,
>> >>>>
>> >>>> Does topic  has partitions? which version of Spark you are using?
>> >>>>
On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S wrote:
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> Here is a working example I did.
>> >>>>>
>> >>>>> HTH
>> >>>>>
>> >>>>> Regards,
>> >>>>>
>> >>>>> Sivakumaran S
>> >>>>>
>> >>>>> val topics = "test"
>> >>>>> val brokers = "localhost:9092"
>> >>>>> val topicsSet = topics.split(",").toSet
>> >>>>> val sparkConf = new
>> >>>>> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
>> >>>>> //spark://localhost:7077
>> >>>>> val sc = new SparkContext(sparkConf)
>> >>>>> val ssc = new StreamingContext(sc, Seconds(60))
>> >>>>> val kafkaParams = Map[String, String]("" ->
>> >>>>> brokers)
>> >>>>> val messages = KafkaUtils.createDirectStream[String, String,
>> >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> >>>>> messages.foreachRDD(rdd => {
>> >>>>>   if (rdd.isEmpty()) {
>> >>>>>     println("Failed to get data from Kafka. Please check that the
>> Kafka
>> >>>>> producer is streaming data.")
>> >>>>>     System.exit(-1)
>> >>>>>   }
>> >>>>>   val sqlContext =
>> >>>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>> >>>>>   val weatherDF =
>> >>>>>   //Process your DF as required here on
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>>
On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi wrote:
>> >>>>> <> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> I am reading json messages from kafka . Topics has 2 partitions.
>> When
>> >>>>> running streaming job using spark-submit, I could see that  val
>> dataFrame =
>> >>>>> executes indefinitely. Am I
>> doing
>> >>>>> something wrong here. Below is code .This environment is cloudera
>> sandbox
>> >>>>> env. Same issue in hadoop production cluster mode except that it is
>> >>>>> restricted thats why tried to reproduce issue in Cloudera sandbox.
>> Kafka
>> >>>>> 0.10 and  Spark 1.4.
>> >>>>>
>> >>>>> val kafkaParams =
>> >>>>> Map[String,String]("bootstrap.servers"->"localhost:9093,loca
>> lhost:9092",
>> >>>>> "" -> "xyz","auto.offset.reset"->"smallest")
>> >>>>> val conf = new SparkConf().setMaster("local[3
>> ]").setAppName("topic")
>> >>>>> val ssc = new StreamingContext(conf, Seconds(1))
>> >>>>>
>> >>>>> val sqlContext = new org.apache.spark.sql.SQLContex
>> t(ssc.sparkContext)
>> >>>>>
>> >>>>> val topics = Set("gpp.minf")
>> >>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String,
>> >>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> >>>>>
>> >>>>> kafkaStream.foreachRDD(
>> >>>>>   rdd => {
>> >>>>>     if (rdd.count > 0){
>> >>>>>         val dataFrame =
>> >>>>>        dataFrame.printSchema()
>> >>>>> //dataFrame.foreach(println)
>> >>>>> }
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >

