Re: Spark streaming not processing messages from partitioned topics

2016-08-11 Thread Diwakar Dhanuskodi
Figured it out. All I am doing wrong is testing it out in pseudo node vm
with 1 core. The tasks were hanging out for cpu.
In production cluster this works just fine.


On Thu, Aug 11, 2016 at 12:45 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Checked executor logs and UI . There is no error message or something like
> that.  when there is any action , it is  waiting .
> There are data in partitions. I could use simple-consumer-shell and print
> all data in console.  Am I doing anything wrong in foreachRDD?.
> This just works fine  with single partitioned topic,
>
> On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger 
> wrote:
>
>> zookeeper.connect is irrelevant.
>>
>> Did you look at your executor logs?
>> Did you look at the UI for the (probably failed) stages?
>> Are you actually producing data into all of the kafka partitions?
>> If you use kafka-simple-consumer-shell.sh to read that partition, do
>> you get any data?
>>
>> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
>>  wrote:
>> > Hi Cody,
>> >
>> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
>> batch
>> > window. Other batches are queued. I could see foreach(println) of
>> dataFrame
>> > printing one of partition's data and not the other.
>> > Couldn't see any  errors from log.
>> >
>> > val brokers = "localhost:9092,localhost:9093"
>> > val sparkConf = new
>> > SparkConf().setAppName("KafkaWeather").setMaster("local[5]")
>> //spark://localhost:7077
>> > val sc = new SparkContext(sparkConf)
>> > val ssc = new StreamingContext(sc, Seconds(1))
>> > val kafkaParams = Map[String,
>> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
>> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
>> group.id"->"xyz")
>> > val topics = "test"
>> > val topicsSet = topics.split(",").toSet
>> > val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topicsSet)
>> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> > import sqlContext.implicits._
>> > messages.foreachRDD(rdd => {
>> >   if (rdd.isEmpty()) {
>> > println("Failed to get data from Kafka. Please check that the Kafka
>> > producer is streaming data.")
>> > System.exit(-1)
>> >   }
>> >
>> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>> >   dataframe.foreach(println)
>> >  println( "$$$", dataframe.count())
>> >   })
>> > Logs:
>> >
>> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
>> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where applicable
>> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
>> resolves to
>> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
>> interface
>> > eth1)
>> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> > another address
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to:
>> cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
>> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
>> > users with modify permissions: Set(cloudera)
>> > 16/08/10 18:16:25 INFO Utils: Successfully started service
>> 'sparkDriver' on
>> > port 45031.
>> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
>> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
>> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on
>> addresses
>> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
>> > 16/08/10 18:16:26 INFO Utils: Successfully started service
>> > 'sparkDriverActorSystem' on port 56638.
>> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
>> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
>> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
>> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
>> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
>> 511.5
>> > MB
>> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
>> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
>> port
>> > 4040.
>> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
>> > http://192.168.126.131:4040
>> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
>> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e
>> 68-2952-41b0-a11b-f07860682749
>> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
>> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
>> > server' on port 59491.
>> > 16/08/10 18:16:28 INFO SparkContext: Added JAR
>> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar
>> at
>> > 

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Checked executor logs and UI . There is no error message or something like
that.  when there is any action , it is  waiting .
There are data in partitions. I could use simple-consumer-shell and print
all data in console.  Am I doing anything wrong in foreachRDD?.
This just works fine  with single partitioned topic,

On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger  wrote:

> zookeeper.connect is irrelevant.
>
> Did you look at your executor logs?
> Did you look at the UI for the (probably failed) stages?
> Are you actually producing data into all of the kafka partitions?
> If you use kafka-simple-consumer-shell.sh to read that partition, do
> you get any data?
>
> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
>  wrote:
> > Hi Cody,
> >
> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
> batch
> > window. Other batches are queued. I could see foreach(println) of
> dataFrame
> > printing one of partition's data and not the other.
> > Couldn't see any  errors from log.
> >
> > val brokers = "localhost:9092,localhost:9093"
> > val sparkConf = new
> > SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> > val sc = new SparkContext(sparkConf)
> > val ssc = new StreamingContext(sc, Seconds(1))
> > val kafkaParams = Map[String,
> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
> group.id"->"xyz")
> > val topics = "test"
> > val topicsSet = topics.split(",").toSet
> > val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topicsSet)
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> > import sqlContext.implicits._
> > messages.foreachRDD(rdd => {
> >   if (rdd.isEmpty()) {
> > println("Failed to get data from Kafka. Please check that the Kafka
> > producer is streaming data.")
> > System.exit(-1)
> >   }
> >
> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
> >   dataframe.foreach(println)
> >  println( "$$$", dataframe.count())
> >   })
> > Logs:
> >
> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
> resolves to
> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface
> > eth1)
> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> > another address
> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
> > users with modify permissions: Set(cloudera)
> > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver'
> on
> > port 45031.
> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> > 16/08/10 18:16:26 INFO Utils: Successfully started service
> > 'sparkDriverActorSystem' on port 56638.
> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
> 511.5
> > MB
> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> > http://192.168.126.131:4040
> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-
> 2b2a4e68-2952-41b0-a11b-f07860682749
> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> > server' on port 59491.
> > 16/08/10 18:16:28 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> > http://192.168.126.131:59491/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> > with timestamp 1470833188094
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> > timestamp 1470833189531
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > 

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Cody Koeninger
zookeeper.connect is irrelevant.

Did you look at your executor logs?
Did you look at the UI for the (probably failed) stages?
Are you actually producing data into all of the kafka partitions?
If you use kafka-simple-consumer-shell.sh to read that partition, do
you get any data?

On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
 wrote:
> Hi Cody,
>
> Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
> window. Other batches are queued. I could see foreach(println) of dataFrame
> printing one of partition's data and not the other.
> Couldn't see any  errors from log.
>
> val brokers = "localhost:9092,localhost:9093"
> val sparkConf = new
> SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(1))
> val kafkaParams = Map[String,
> String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","group.id"->"xyz")
> val topics = "test"
> val topicsSet = topics.split(",").toSet
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> import sqlContext.implicits._
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>
>val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>   dataframe.foreach(println)
>  println( "$$$", dataframe.count())
>   })
> Logs:
>
> 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves to
> a loopback address: 127.0.0.1; using 192.168.126.131 instead (on interface
> eth1)
> 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
> port 45031.
> 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 18:16:26 INFO Remoting: Starting remoting
> 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> 16/08/10 18:16:26 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 56638.
> 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
> MB
> 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> http://192.168.126.131:4040
> 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
> 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> server' on port 59491.
> 16/08/10 18:16:28 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> with timestamp 1470833188094
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
> timestamp 1470833189531
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/Downloads/boa/pain.jar at
> http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
> localhost
> 16/08/10 18:16:29 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
> 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
> 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
> 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block manager
> localhost:55361 with 511.5 MB RAM, 

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Hi Cody,

Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
window. Other batches are queued. I could see foreach(println) of dataFrame
printing one of partition's data and not the other.
Couldn't see any  errors from log.

val brokers = "localhost:9092,localhost:9093"
val sparkConf = new
SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
val kafkaParams = Map[String,
String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
group.id"->"xyz")
val topics = "test"
val topicsSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext.implicits._
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
println("Failed to get data from Kafka. Please check that the Kafka
producer is streaming data.")
System.exit(-1)
  }

   val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
  dataframe.foreach(println)
 println( "$$$", dataframe.count())
  })
Logs:

16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves
to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
interface eth1)
16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(cloudera);
users with modify permissions: Set(cloudera)
16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
port 45031.
16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
16/08/10 18:16:26 INFO Remoting: Starting remoting
16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
16/08/10 18:16:26 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 56638.
16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
MB
16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
http://192.168.126.131:4040
16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
server' on port 59491.
16/08/10 18:16:28 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
with timestamp 1470833188094
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
timestamp 1470833189531
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/Downloads/boa/pain.jar at
http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
localhost
16/08/10 18:16:29 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:55361 with 511.5 MB RAM, BlockManagerId(driver,
localhost, 55361)
16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
16/08/10 18:16:31 INFO 

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Cody Koeninger
Those logs you're posting are from right after your failure, they don't
include what actually went wrong when attempting to read json. Look at your
logs more carefully.
On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" 
wrote:

> Hi Siva,
>
> With below code, it is stuck up at
> * sqlContext.read.json(rdd.map(_._2)).toDF()*
> There are two partitions in  topic.
> I am running spark 1.6.2
>
> val topics = "topic.name"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
> group.id" -> "xyz","auto.offset.reset"->"smallest")
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
>   dataframe.foreach(println)
>
>   })
>
>
> Below are logs,
>
> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
> todelete.scala:34) failed in 110.776 s
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event SparkListenerStageCompleted(
> org.apache.spark.scheduler.StageInfo@6d8ff688)
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
> Job 0 cancelled because SparkContext was shut down))
> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
> 16/08/10 12:27:51 INFO OutputCommitCoordinator$
> OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-
> 07b9c1b6-01db-45b5-9302-d2f67f7c490e
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
> com.boa.poc.todelete --jars /home/cloudera/lib/spark-
> streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/
> spark-assembly-1.6.2-hadoop2.6.0.jar /home/cloudera/Downloads/boa/pain.jar
> > log.txt
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface eth1)
> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 42140.
> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 12:28:01 INFO Remoting: Starting remoting
> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
> 16/08/10 12:28:01 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 53328.
> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Sivakumaran S
I am testing with one partition now. I am using Kafka 0.9 and Spark 1.6.1 
(Scala 2.11). Just start with one topic first and then add more. I am not 
partitioning the topic.

HTH, 

Regards,

Sivakumaran

> On 10-Aug-2016, at 5:56 AM, Diwakar Dhanuskodi  
> wrote:
> 
> Hi Siva,
> 
> Does topic  has partitions? which version of Spark you are using?
> 
> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S  > wrote:
> Hi,
> 
> Here is a working example I did.
> 
> HTH
> 
> Regards,
> 
> Sivakumaran S
> 
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") 
> //spark://localhost:7077 <>
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka 
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = 
> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
> 
> 
> 
>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi > > wrote:
>> 
>> Hi,
>> 
>> I am reading json messages from kafka . Topics has 2 partitions. When 
>> running streaming job using spark-submit, I could see that  val dataFrame = 
>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing 
>> something wrong here. Below is code .This environment is cloudera sandbox 
>> env. Same issue in hadoop production cluster mode except that it is 
>> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 
>> 0.10 and  Spark 1.4.
>> 
>> val kafkaParams = 
>> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", 
>> "group.id " -> "xyz","auto.offset.reset"->"smallest")
>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>> val ssc = new StreamingContext(conf, Seconds(1))
>> 
>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> 
>> val topics = Set("gpp.minf")
>> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> 
>> kafkaStream.foreachRDD(
>>   rdd => {
>> if (rdd.count > 0){
>> val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
>>dataFrame.printSchema()
>> //dataFrame.foreach(println)
>> }
>> }
> 
> 



Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Hi Siva,

With below code, it is stuck up at
* sqlContext.read.json(rdd.map(_._2)).toDF()*
There are two partitions in  topic.
I am running spark 1.6.2

val topics = "topic.name"
val brokers = "localhost:9092"
val topicsSet = topics.split(",").toSet
val sparkConf = new
SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
group.id" -> "xyz","auto.offset.reset"->"smallest")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
println("Failed to get data from Kafka. Please check that the Kafka
producer is streaming data.")
System.exit(-1)
  }
  val sqlContext =
org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
  *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
  dataframe.foreach(println)

  })


Below are logs,

16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
todelete.scala:34) failed in 110.776 s
16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6d8ff688)
16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
Job 0 cancelled because SparkContext was shut down))
16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
16/08/10 12:27:51 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
/tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1b6-01db-45b5-9302-d2f67f7c490e
16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
/tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
[cloudera@quickstart ~]$ spark-submit --master local[3] --class
com.boa.poc.todelete --jars
/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar
/home/cloudera/Downloads/boa/pain.jar > log.txt
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
interface eth1)
16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(cloudera);
users with modify permissions: Set(cloudera)
16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver' on
port 42140.
16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
16/08/10 12:28:01 INFO Remoting: Starting remoting
16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
16/08/10 12:28:01 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 53328.
16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity 511.5
MB
16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
http://192.168.126.131:4040
16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563ad1-3d30-4a9c-ab11-82ecbb2e71b0
16/08/10 12:28:02 INFO HttpServer: Starting HTTP 

Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
Hi Siva,

Does topic  has partitions? which version of Spark you are using?

On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S  wrote:

> Hi,
>
> Here is a working example I did.
>
> HTH
>
> Regards,
>
> Sivakumaran S
>
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
> //spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
>
>
>
> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
> Hi,
>
> I am reading json messages from kafka . Topics has 2 partitions. When
> running streaming job using spark-submit, I could see that * val
> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely.
> Am I doing something wrong here. Below is code .This environment is
> cloudera sandbox env. Same issue in hadoop production cluster mode except
> that it is restricted thats why tried to reproduce issue in Cloudera
> sandbox. Kafka 0.10 and  Spark 1.4.
>
> val kafkaParams = Map[String,String]("bootstrap.
> servers"->"localhost:9093,localhost:9092", "group.id" ->
> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
>
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>
> kafkaStream.foreachRDD(
>   rdd => {
> if (rdd.count > 0){
>* val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
>dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }
>
>
>


Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
It stops working at sqlContext.read.json(rdd.map(_._2)) . Topics without
partitions is working fine. Do I need to set any other configs
val kafkaParams =
Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", "
group.id" -> "xyz","auto.offset.reset"->"smallest")
Spark version is 1.6.2

kafkaStream.foreachRDD(
  rdd => {
   rdd.foreach(println)
   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   dataFrame.foreach(println)
}
)

On Wed, Aug 10, 2016 at 9:05 AM, Cody Koeninger  wrote:

> No, you don't need a conditional.  read.json on an empty rdd will
> return an empty dataframe.  Foreach on an empty dataframe or an empty
> rdd won't do anything (a task will still get run, but it won't do
> anything).
>
> Leave the conditional out.  Add one thing at a time to the working
> rdd.foreach example and see when it stops working, then take a closer
> look at the logs.
>
>
> On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi
>  wrote:
> > Hi Cody,
> >
> > Without conditional . It is going with fine. But any processing inside
> > conditional get on to waiting (or) something.
> > Facing this issue with partitioned topics. I would need conditional to
> skip
> > processing when batch is empty.
> > kafkaStream.foreachRDD(
> >   rdd => {
> >
> >val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >/*if (dataFrame.count() > 0) {
> >dataFrame.foreach(println)
> >}
> >else
> >{
> >  println("Empty DStream ")
> >}*/
> > })
> >
> > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger 
> wrote:
> >>
> >> Take out the conditional and the sqlcontext and just do
> >>
> >> rdd => {
> >>   rdd.foreach(println)
> >>
> >>
> >> as a base line to see if you're reading the data you expect
> >>
> >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
> >>  wrote:
> >> > Hi,
> >> >
> >> > I am reading json messages from kafka . Topics has 2 partitions. When
> >> > running streaming job using spark-submit, I could see that  val
> >> > dataFrame =
> >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> >> > something wrong here. Below is code .This environment is cloudera
> >> > sandbox
> >> > env. Same issue in hadoop production cluster mode except that it is
> >> > restricted thats why tried to reproduce issue in Cloudera sandbox.
> Kafka
> >> > 0.10 and  Spark 1.4.
> >> >
> >> > val kafkaParams =
> >> > Map[String,String]("bootstrap.servers"->"localhost:9093,
> localhost:9092",
> >> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> >> > val ssc = new StreamingContext(conf, Seconds(1))
> >> >
> >> > val sqlContext = new org.apache.spark.sql.
> SQLContext(ssc.sparkContext)
> >> >
> >> > val topics = Set("gpp.minf")
> >> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >> >
> >> > kafkaStream.foreachRDD(
> >> >   rdd => {
> >> > if (rdd.count > 0){
> >> > val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >> >dataFrame.printSchema()
> >> > //dataFrame.foreach(println)
> >> > }
> >> > }
> >
> >
>


Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Cody Koeninger
No, you don't need a conditional.  read.json on an empty rdd will
return an empty dataframe.  Foreach on an empty dataframe or an empty
rdd won't do anything (a task will still get run, but it won't do
anything).

Leave the conditional out.  Add one thing at a time to the working
rdd.foreach example and see when it stops working, then take a closer
look at the logs.


On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi
 wrote:
> Hi Cody,
>
> Without conditional . It is going with fine. But any processing inside
> conditional get on to waiting (or) something.
> Facing this issue with partitioned topics. I would need conditional to skip
> processing when batch is empty.
> kafkaStream.foreachRDD(
>   rdd => {
>
>val dataFrame = sqlContext.read.json(rdd.map(_._2))
>/*if (dataFrame.count() > 0) {
>dataFrame.foreach(println)
>}
>else
>{
>  println("Empty DStream ")
>}*/
> })
>
> On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger  wrote:
>>
>> Take out the conditional and the sqlcontext and just do
>>
>> rdd => {
>>   rdd.foreach(println)
>>
>>
>> as a base line to see if you're reading the data you expect
>>
>> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
>>  wrote:
>> > Hi,
>> >
>> > I am reading json messages from kafka . Topics has 2 partitions. When
>> > running streaming job using spark-submit, I could see that  val
>> > dataFrame =
>> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
>> > something wrong here. Below is code .This environment is cloudera
>> > sandbox
>> > env. Same issue in hadoop production cluster mode except that it is
>> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
>> > 0.10 and  Spark 1.4.
>> >
>> > val kafkaParams =
>> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
>> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
>> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>> > val ssc = new StreamingContext(conf, Seconds(1))
>> >
>> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> >
>> > val topics = Set("gpp.minf")
>> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> >
>> > kafkaStream.foreachRDD(
>> >   rdd => {
>> > if (rdd.count > 0){
>> > val dataFrame = sqlContext.read.json(rdd.map(_._2))
>> >dataFrame.printSchema()
>> > //dataFrame.foreach(println)
>> > }
>> > }
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
Hi Cody,

Without conditional . It is going with fine. But any processing inside
conditional get on to waiting (or) something.
Facing this issue with partitioned topics. I would need conditional to skip
processing when batch is empty.
kafkaStream.foreachRDD(
  rdd => {

   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   /*if (dataFrame.count() > 0) {
   dataFrame.foreach(println)
   }
   else
   {
 println("Empty DStream ")
   }*/
})

On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger  wrote:

> Take out the conditional and the sqlcontext and just do
>
> rdd => {
>   rdd.foreach(println)
>
>
> as a base line to see if you're reading the data you expect
>
> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
>  wrote:
> > Hi,
> >
> > I am reading json messages from kafka . Topics has 2 partitions. When
> > running streaming job using spark-submit, I could see that  val
> dataFrame =
> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> > something wrong here. Below is code .This environment is cloudera sandbox
> > env. Same issue in hadoop production cluster mode except that it is
> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
> > 0.10 and  Spark 1.4.
> >
> > val kafkaParams =
> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> > val ssc = new StreamingContext(conf, Seconds(1))
> >
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> >
> > val topics = Set("gpp.minf")
> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >
> > kafkaStream.foreachRDD(
> >   rdd => {
> > if (rdd.count > 0){
> > val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >dataFrame.printSchema()
> > //dataFrame.foreach(println)
> > }
> > }
>


Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Sivakumaran S
Hi,

Here is a working example I did.

HTH

Regards,

Sivakumaran S

val topics = "test"
val brokers = "localhost:9092"
val topicsSet = topics.split(",").toSet
val sparkConf = new 
SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") 
//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
println("Failed to get data from Kafka. Please check that the Kafka 
producer is streaming data.")
System.exit(-1)
  }
  val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
  val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
  //Process your DF as required here on
}



> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi  
> wrote:
> 
> Hi,
> 
> I am reading json messages from kafka . Topics has 2 partitions. When running 
> streaming job using spark-submit, I could see that  val dataFrame = 
> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing 
> something wrong here. Below is code .This environment is cloudera sandbox 
> env. Same issue in hadoop production cluster mode except that it is 
> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 0.10 
> and  Spark 1.4.
> 
> val kafkaParams = 
> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", 
> "group.id " -> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
> 
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> 
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> 
> kafkaStream.foreachRDD(
>   rdd => {
> if (rdd.count > 0){
> val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
>dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }



Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Cody Koeninger
Take out the conditional and the sqlcontext and just do

rdd => {
  rdd.foreach(println)


as a base line to see if you're reading the data you expect

On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
 wrote:
> Hi,
>
> I am reading json messages from kafka . Topics has 2 partitions. When
> running streaming job using spark-submit, I could see that  val dataFrame =
> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> something wrong here. Below is code .This environment is cloudera sandbox
> env. Same issue in hadoop production cluster mode except that it is
> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
> 0.10 and  Spark 1.4.
>
> val kafkaParams =
> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
> "group.id" -> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
>
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>
> kafkaStream.foreachRDD(
>   rdd => {
> if (rdd.count > 0){
> val dataFrame = sqlContext.read.json(rdd.map(_._2))
>dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org