Those logs you're posting are from right after your failure, they don't
include what actually went wrong when attempting to read json. Look at your
logs more carefully.
On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com>
wrote:

> Hi Siva,
>
> With below code, it is stuck up at
> * sqlContext.read.json(rdd.map(_._2)).toDF()*
> There are two partitions in  topic.
> I am running spark 1.6.2
>
> val topics = "topic.name"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
> group.id" -> "xyz","auto.offset.reset"->"smallest")
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
>     println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
>     System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
>   dataframe.foreach(println)
>
>               })
>
>
> Below are logs,
>
> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
> todelete.scala:34) failed in 110.776 s
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event SparkListenerStageCompleted(
> org.apache.spark.scheduler.StageInfo@6d8ff688)
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
> Job 0 cancelled because SparkContext was shut down))
> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
> 16/08/10 12:27:51 INFO OutputCommitCoordinator$
> OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-
> 07b9c1b6-01db-45b5-9302-d2f67f7c490e
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
> com.boa.poc.todelete --jars /home/cloudera/lib/spark-
> streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/
> spark-assembly-1.6.2-hadoop2.6.0.jar /home/cloudera/Downloads/boa/pain.jar
> > log.txt
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface eth1)
> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 42140.
> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 12:28:01 INFO Remoting: Starting remoting
> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
> 16/08/10 12:28:01 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 53328.
> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity
> 511.5 MB
> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
> http://192.168.126.131:4040
> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-
> 70563ad1-3d30-4a9c-ab11-82ecbb2e71b0
> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server
> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file
> server' on port 58957.
> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> http://192.168.126.131:58957/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar with timestamp 1470812282187
> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with timestamp 1470812282398
> 16/08/10 12:28:02 INFO SparkContext: Added JAR
> file:/home/cloudera/Downloads/boa/pain.jar at
> http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402
> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host
> localhost
> 16/08/10 12:28:02 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56716.
> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on 56716
> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register BlockManager
> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver,
> localhost, 56716)
> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager
> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties
> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is
> overridden to xyz
> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to
> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = 60000
> ms
> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and validated
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5
> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms
> 16/08/10 12:28:03 INFO ForEachDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null
> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms
> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143
> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator at
> time 1470812340000
> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at 1470812340000
> ms
> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler
> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started
> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties
> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is
> overridden to xyz
> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to
> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000 ms
> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job
> 1470812340000 ms.0 from job set of time 1470812340000 ms
> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at
> todelete.scala:34
> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at todelete.scala:34)
> with 2 output partitions
> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json at
> todelete.scala:34)
> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List()
> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List()
> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no missing
> parents
> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 4.7 KB, free 4.7 KB)
> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 2.6 KB, free 7.2 KB)
> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB)
> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast at
> DAGScheduler.scala:1006
> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from
> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34)
> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/
> jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp
> 1470812282187
> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/
> jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar to
> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-
> 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-
> aa89-9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class
> loader
> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/
> jars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470812282398
> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/
> jars/spark-assembly-1.6.2-hadoop2.6.0.jar to
> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-
> 56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-
> aa89-9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar to class loader
> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/
> jars/pain.jar with timestamp 1470812282402
> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/
> jars/pain.jar to /tmp/spark-861074da-9bfb-475c-
> a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/
> fetchFileTemp100401525133805542.tmp
> 16/08/10 12:29:02 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
> to class loader
> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0
> offsets 0 -> 8
> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset is
> overridden to smallest
> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is
> overridden to xyz
> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to
> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
> 13366 bytes result sent to driver
> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2380 ms on localhost (1/2)
> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000 ms
> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000 ms
> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000 ms
> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000 ms
> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000 ms
>
>
> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> Hi Siva,
>>
>> Does topic  has partitions? which version of Spark you are using?
>>
>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Here is a working example I did.
>>>
>>> HTH
>>>
>>> Regards,
>>>
>>> Sivakumaran S
>>>
>>> val topics = "test"
>>> val brokers = "localhost:9092"
>>> val topicsSet = topics.split(",").toSet
>>> val sparkConf = new SparkConf().setAppName("KafkaW
>>> eatherCalc").setMaster("local") //spark://localhost:7077
>>> val sc = new SparkContext(sparkConf)
>>> val ssc = new StreamingContext(sc, Seconds(60))
>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>> messages.foreachRDD(rdd => {
>>>   if (rdd.isEmpty()) {
>>>     println("Failed to get data from Kafka. Please check that the Kafka
>>> producer is streaming data.")
>>>     System.exit(-1)
>>>   }
>>>   val sqlContext = org.apache.spark.sql.SQLContex
>>> t.getOrCreate(rdd.sparkContext)
>>>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>>>   //Process your DF as required here on
>>> }
>>>
>>>
>>>
>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
>>> diwakar.dhanusk...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I am reading json messages from kafka . Topics has 2 partitions. When
>>> running streaming job using spark-submit, I could see that * val
>>> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely.
>>> Am I doing something wrong here. Below is code .This environment is
>>> cloudera sandbox env. Same issue in hadoop production cluster mode except
>>> that it is restricted thats why tried to reproduce issue in Cloudera
>>> sandbox. Kafka 0.10 and  Spark 1.4.
>>>
>>> val kafkaParams = Map[String,String]("bootstrap.
>>> servers"->"localhost:9093,localhost:9092", "group.id" ->
>>> "xyz","auto.offset.reset"->"smallest")
>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>>> val ssc = new StreamingContext(conf, Seconds(1))
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>>>
>>> val topics = Set("gpp.minf")
>>> val kafkaStream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>>>
>>> kafkaStream.foreachRDD(
>>>   rdd => {
>>>     if (rdd.count > 0){
>>>        * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
>>>        dataFrame.printSchema()
>>> //dataFrame.foreach(println)
>>> }
>>> }
>>>
>>>
>>>
>>
>

Reply via email to