zookeeper.connect is irrelevant.

Did you look at your executor logs?
Did you look at the UI for the (probably failed) stages?
Are you actually producing data into all of the kafka partitions?
If you use kafka-simple-consumer-shell.sh to read that partition, do
you get any data?

On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
<diwakar.dhanusk...@gmail.com> wrote:
> Hi Cody,
>
> Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
> window. Other batches are queued. I could see foreach(println) of dataFrame
> printing one of partition's data and not the other.
> Couldn't see any  errors from log.
>
> val brokers = "localhost:9092,localhost:9093"
> val sparkConf = new
> SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(1))
> val kafkaParams = Map[String,
> String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","group.id"->"xyz")
> val topics = "test"
> val topicsSet = topics.split(",").toSet
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> import sqlContext.implicits._
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
>     println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
>     System.exit(-1)
>   }
>
>    val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>   dataframe.foreach(println)
>  println( "$$$$$$$$$$$", dataframe.count())
>               })
> Logs:
>
> 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves to
> a loopback address: 127.0.0.1; using 192.168.126.131 instead (on interface
> eth1)
> 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
> port 45031.
> 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 18:16:26 INFO Remoting: Starting remoting
> 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> 16/08/10 18:16:26 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 56638.
> 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
> MB
> 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> http://192.168.126.131:4040
> 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
> 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> server' on port 59491.
> 16/08/10 18:16:28 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> with timestamp 1470833188094
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
> timestamp 1470833189531
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/Downloads/boa/pain.jar at
> http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
> localhost
> 16/08/10 18:16:29 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
> 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
> 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
> 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block manager
> localhost:55361 with 511.5 MB RAM, BlockManagerId(driver, localhost, 55361)
> 16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
> 16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
> 16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is overridden
> to xyz
> 16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to localhost:2181
> 16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = 1000 ms
> 16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and validated
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff
> 16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms
> 16/08/10 18:16:31 INFO ForEachDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null
> 16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms
> 16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@d8e872
> 16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at
> time 1470833192000
> 16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at 1470833192000
> ms
> 16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler
> 16/08/10 18:16:31 INFO StreamingContext: StreamingContext started
> 16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties
> 16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is overridden
> to xyz
> 16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to localhost:2181
> 16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 ms
> 16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job
> 1470833192000 ms.0 from job set of time 1470833192000 ms
> 16/08/10 18:16:32 INFO SparkContext: Starting job: json at todelete.scala:42
> 16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at todelete.scala:42)
> with 2 output partitions
> 16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at
> todelete.scala:42)
> 16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List()
> 16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List()
> 16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing
> parents
> 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 4.7 KB, free 4.7 KB)
> 16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes
> in memory (estimated size 2.6 KB, free 7.3 KB)
> 16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
> on localhost:55361 (size: 2.6 KB, free: 511.5 MB)
> 16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast at
> DAGScheduler.scala:1006
> 16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from
> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42)
> 16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
> 16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
> 16/08/10 18:16:32 INFO Executor: Fetching
> http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
> timestamp 1470833189531
> 16/08/10 18:16:32 INFO Utils: Fetching
> http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp
> 16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 ms
> 16/08/10 18:16:33 INFO Executor: Adding
> file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-assembly-1.6.2-hadoop2.6.0.jar
> to class loader
> 16/08/10 18:16:33 INFO Executor: Fetching
> http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> 16/08/10 18:16:33 INFO Utils: Fetching
> http://192.168.126.131:59491/jars/pain.jar to
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp
> 16/08/10 18:16:33 INFO Executor: Adding
> file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar
> to class loader
> 16/08/10 18:16:33 INFO Executor: Fetching
> http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> with timestamp 1470833188094
> 16/08/10 18:16:33 INFO Utils: Fetching
> http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> to
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp
> 16/08/10 18:16:33 INFO Executor: Adding
> file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> to class loader
> 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 offsets 0
> -> 20
> 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties
> 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is overridden
> to xyz
> 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to localhost:2181
> 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms
> 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
> 13366 bytes result sent to driver
> 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2423 ms on localhost (1/2)
> 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms
> 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms
> 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms
> 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms
> 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms
> 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 ms
> 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms
> 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms
> 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms
> 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms
> 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms
> 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms
> 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms
> 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms
> 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms
> 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 ms
> 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms
> 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms
> 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms
> 16/08/10 18:16:54 INFO JobSch
>
> On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Those logs you're posting are from right after your failure, they don't
>> include what actually went wrong when attempting to read json. Look at your
>> logs more carefully.
>>
>> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi"
>> <diwakar.dhanusk...@gmail.com> wrote:
>>>
>>> Hi Siva,
>>>
>>> With below code, it is stuck up at
>>> sqlContext.read.json(rdd.map(_._2)).toDF()
>>> There are two partitions in  topic.
>>> I am running spark 1.6.2
>>>
>>> val topics = "topic.name"
>>> val brokers = "localhost:9092"
>>> val topicsSet = topics.split(",").toSet
>>> val sparkConf = new
>>> SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
>>> val sc = new SparkContext(sparkConf)
>>> val ssc = new StreamingContext(sc, Seconds(60))
>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
>>> "group.id" -> "xyz","auto.offset.reset"->"smallest")
>>> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>> messages.foreachRDD(rdd => {
>>>   if (rdd.isEmpty()) {
>>>     println("Failed to get data from Kafka. Please check that the Kafka
>>> producer is streaming data.")
>>>     System.exit(-1)
>>>   }
>>>   val sqlContext =
>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>>>   val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>>>   dataframe.foreach(println)
>>>
>>>               })
>>>
>>>
>>> Below are logs,
>>>
>>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
>>> todelete.scala:34) failed in 110.776 s
>>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>>> stopped! Dropping event
>>> SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6d8ff688)
>>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>>> stopped! Dropping event
>>> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
>>> Job 0 cancelled because SparkContext was shut down))
>>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
>>> MapOutputTrackerMasterEndpoint stopped!
>>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
>>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
>>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
>>> 16/08/10 12:27:51 INFO
>>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
>>> OutputCommitCoordinator stopped!
>>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
>>> Shutting down remote daemon.
>>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
>>> daemon shut down; proceeding with flushing remote transports.
>>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
>>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
>>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1b6-01db-45b5-9302-d2f67f7c490e
>>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
>>> Remoting shut down.
>>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
>>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
>>> com.boa.poc.todelete --jars
>>> /home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar
>>> /home/cloudera/Downloads/boa/pain.jar > log.txt
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
>>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
>>> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
>>> interface eth1)
>>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>>> another address
>>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
>>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
>>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
>>> disabled; ui acls disabled; users with view permissions: Set(cloudera);
>>> users with modify permissions: Set(cloudera)
>>> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver'
>>> on port 42140.
>>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
>>> 16/08/10 12:28:01 INFO Remoting: Starting remoting
>>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
>>> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
>>> 16/08/10 12:28:01 INFO Utils: Successfully started service
>>> 'sparkDriverActorSystem' on port 53328.
>>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
>>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
>>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
>>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
>>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity
>>> 511.5 MB
>>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
>>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
>>> port 4040.
>>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
>>> http://192.168.126.131:4040
>>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
>>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563ad1-3d30-4a9c-ab11-82ecbb2e71b0
>>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server
>>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file
>>> server' on port 58957.
>>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
>>> http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>>> with timestamp 1470812282187
>>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
>>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
>>> timestamp 1470812282398
>>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>>> file:/home/cloudera/Downloads/boa/pain.jar at
>>> http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402
>>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host
>>> localhost
>>> 16/08/10 12:28:02 INFO Utils: Successfully started service
>>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56716.
>>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on 56716
>>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block
>>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver, localhost,
>>> 56716)
>>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager
>>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties
>>> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset
>>> is overridden to smallest
>>> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is
>>> overridden to xyz
>>> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect
>>> is overridden to
>>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval =
>>> null
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = 60000
>>> ms
>>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and validated
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5
>>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms
>>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null
>>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms
>>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143
>>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator at
>>> time 1470812340000
>>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at
>>> 1470812340000 ms
>>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler
>>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started
>>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties
>>> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset
>>> is overridden to smallest
>>> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is
>>> overridden to xyz
>>> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect
>>> is overridden to
>>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000 ms
>>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job
>>> 1470812340000 ms.0 from job set of time 1470812340000 ms
>>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at
>>> todelete.scala:34
>>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at
>>> todelete.scala:34) with 2 output partitions
>>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json at
>>> todelete.scala:34)
>>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List()
>>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List()
>>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0
>>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no missing
>>> parents
>>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values in
>>> memory (estimated size 4.7 KB, free 4.7 KB)
>>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>> bytes in memory (estimated size 2.6 KB, free 7.2 KB)
>>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB)
>>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast
>>> at DAGScheduler.scala:1006
>>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from
>>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34)
>>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
>>> tasks
>>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
>>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
>>> 16/08/10 12:29:01 INFO Executor: Fetching
>>> http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>>> with timestamp 1470812282187
>>> 16/08/10 12:29:01 INFO Utils: Fetching
>>> http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>>> to
>>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
>>> 16/08/10 12:29:01 INFO Executor: Adding
>>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>>> to class loader
>>> 16/08/10 12:29:01 INFO Executor: Fetching
>>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
>>> timestamp 1470812282398
>>> 16/08/10 12:29:01 INFO Utils: Fetching
>>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to
>>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
>>> 16/08/10 12:29:01 INFO Executor: Adding
>>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar
>>> to class loader
>>> 16/08/10 12:29:01 INFO Executor: Fetching
>>> http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402
>>> 16/08/10 12:29:01 INFO Utils: Fetching
>>> http://192.168.126.131:58957/jars/pain.jar to
>>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp
>>> 16/08/10 12:29:02 INFO Executor: Adding
>>> file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
>>> to class loader
>>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0
>>> offsets 0 -> 8
>>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
>>> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset
>>> is overridden to smallest
>>> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is
>>> overridden to xyz
>>> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect
>>> is overridden to
>>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
>>> 13366 bytes result sent to driver
>>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>>> (TID 0) in 2380 ms on localhost (1/2)
>>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000 ms
>>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000 ms
>>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000 ms
>>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000 ms
>>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000 ms
>>>
>>>
>>> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi
>>> <diwakar.dhanusk...@gmail.com> wrote:
>>>>
>>>> Hi Siva,
>>>>
>>>> Does topic  has partitions? which version of Spark you are using?
>>>>
>>>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Here is a working example I did.
>>>>>
>>>>> HTH
>>>>>
>>>>> Regards,
>>>>>
>>>>> Sivakumaran S
>>>>>
>>>>> val topics = "test"
>>>>> val brokers = "localhost:9092"
>>>>> val topicsSet = topics.split(",").toSet
>>>>> val sparkConf = new
>>>>> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
>>>>> //spark://localhost:7077
>>>>> val sc = new SparkContext(sparkConf)
>>>>> val ssc = new StreamingContext(sc, Seconds(60))
>>>>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>>>>> brokers)
>>>>> val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>>>> messages.foreachRDD(rdd => {
>>>>>   if (rdd.isEmpty()) {
>>>>>     println("Failed to get data from Kafka. Please check that the Kafka
>>>>> producer is streaming data.")
>>>>>     System.exit(-1)
>>>>>   }
>>>>>   val sqlContext =
>>>>> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>>>>>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>>>>>   //Process your DF as required here on
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi
>>>>> <diwakar.dhanusk...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am reading json messages from kafka . Topics has 2 partitions. When
>>>>> running streaming job using spark-submit, I could see that  val dataFrame 
>>>>> =
>>>>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
>>>>> something wrong here. Below is code .This environment is cloudera sandbox
>>>>> env. Same issue in hadoop production cluster mode except that it is
>>>>> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
>>>>> 0.10 and  Spark 1.4.
>>>>>
>>>>> val kafkaParams =
>>>>> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
>>>>> "group.id" -> "xyz","auto.offset.reset"->"smallest")
>>>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>>>>> val ssc = new StreamingContext(conf, Seconds(1))
>>>>>
>>>>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>>>>>
>>>>> val topics = Set("gpp.minf")
>>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>>>>>
>>>>> kafkaStream.foreachRDD(
>>>>>   rdd => {
>>>>>     if (rdd.count > 0){
>>>>>         val dataFrame = sqlContext.read.json(rdd.map(_._2))
>>>>>        dataFrame.printSchema()
>>>>> //dataFrame.foreach(println)
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to