I adopted this approach scala> val conf = new SparkConf(). | setAppName("StreamTest"). | setMaster("local[12]"). | set("spark.driver.allowMultipleContexts", "true"). | set("spark.hadoop.validateOutputSpecs", "false") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7 scala> val ssc = new StreamingContext(conf, Seconds(60)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5dbae9eb scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092") kafkaParams: scala.collection.immutable.Map[String,String] = Map(metadata.broker.list -> rhes564:9092) scala> val topics = Set("newtopic") topics: scala.collection.immutable.Set[String] = Set(newtopic) scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics) stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, Nothing)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21
So that opens data stream. What next? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 1 April 2016 at 22:37, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > yes I noticed that > > scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", > "rhes564:9092", "newtopic", 1) > > <console>:52: error: overloaded method value createStream with > alternatives: > (jssc: > org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum: > String,groupId: String,topics: java.util.Map[String,Integer],storageLevel: > org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String] > <and> > (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum: > String,groupId: String,topics: > scala.collection.immutable.Map[String,Int],storageLevel: > org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, > String)] > cannot be applied to (org.apache.spark.streaming.StreamingContext, > String, String, String, Int) > val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", > "rhes564:9092", "newtopic", 1) > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 1 April 2016 at 22:25, Cody Koeninger <c...@koeninger.org> wrote: > >> You're not passing valid Scala values. rhes564:2181 without quotes >> isn't a valid literal, newtopic isn't a list of strings, etc. >> >> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh >> <mich.talebza...@gmail.com> wrote: >> > Thanks Cody. >> > >> > Can I use Receiver-based Approach here? >> > >> > I have created the topic newtopic as below >> > >> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181 >> > --replication-factor 1 --partitions 1 --topic newtopic >> > >> > >> > This is basically what I am doing the Spark >> > >> > val lines = ssc.socketTextStream("rhes564", 2181) >> > >> > Which obviously not working >> > >> > This is what is suggested in the doc >> > >> > import org.apache.spark.streaming.kafka._ >> > >> > val kafkaStream = KafkaUtils.createStream(streamingContext, >> > [ZK quorum], [consumer group id], [per-topic number of Kafka >> partitions >> > to consume]) >> > >> > * <zkQuorum> is a list of one or more zookeeper servers that make >> quorum >> > * <group> is the name of kafka consumer group >> > * <topics> is a list of one or more kafka topics to consume from >> > * <numThreads> is the number of threads the kafka consumer should use >> > >> > Now this comes back with error. onviously not passing parameters >> correctly! >> > >> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext, >> > rhes564:2181, rhes564:9092, newtopic 1) >> > <console>:1: error: identifier expected but integer literal found. >> > val kafkaStream = KafkaUtils.createStream(streamingContext, >> > rhes564:2181, rhes564:9092, newtopic 1) >> > >> > >> > >> > >> > >> > >> > Dr Mich Talebzadeh >> > >> > >> > >> > LinkedIn >> > >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> > >> > >> > >> > http://talebzadehmich.wordpress.com >> > >> > >> > >> > >> > On 1 April 2016 at 21:13, Cody Koeninger <c...@koeninger.org> wrote: >> >> >> >> It looks like you're using a plain socket stream to connect to a >> >> zookeeper port, which won't work. >> >> >> >> Look at >> spark.apache.org/docs/latest/streaming-kafka-integration.html >> >> >> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh >> >> <mich.talebza...@gmail.com> wrote: >> >> > >> >> > Hi, >> >> > >> >> > I am just testing Spark streaming with Kafka. >> >> > >> >> > Basically I am broadcasting topic every minute to Host:port -> >> >> > rhes564:2181. >> >> > This is sending few lines through a shell script as follows: >> >> > >> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh >> >> > --broker-list >> >> > rhes564:9092 --topic newtopic >> >> > >> >> > That works fine and I can see the messages in >> >> > >> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 >> >> > --topic >> >> > newtopic >> >> > >> >> > Fri Apr 1 21:00:01 BST 2016 ======= Sending messages from rhes5 >> >> > >> >> > >> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101 >> >> > >> >> > >> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102 >> >> > >> >> > >> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103 >> >> > >> >> > >> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104 >> >> > >> >> > >> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105 >> >> > >> >> > Now I try to see the topic in spark streaming as follows: >> >> > >> >> > val conf = new SparkConf(). >> >> > setAppName("StreamTest"). >> >> > setMaster("local[12]"). >> >> > set("spark.driver.allowMultipleContexts", "true"). >> >> > set("spark.hadoop.validateOutputSpecs", "false") >> >> > val sc = new SparkContext(conf) >> >> > // Create sqlContext based on HiveContext >> >> > val sqlContext = new HiveContext(sc) >> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> > // >> >> > // Create a local StreamingContext with two working thread and batch >> >> > interval of 1 second. >> >> > // The master requires 2 cores to prevent from a starvation scenario. >> >> > val ssc = new StreamingContext(conf, Minutes(1)) >> >> > // Create a DStream that will connect to hostname:port, like >> >> > localhost:9999 >> >> > //val lines = ssc.socketTextStream("rhes564", 9092) >> >> > val lines = ssc.socketTextStream("rhes564", 2181) >> >> > // Split each line into words >> >> > val words = lines.flatMap(_.split(" ")) >> >> > val pairs = words.map(word => (word, 1)) >> >> > val wordCounts = pairs.reduceByKey(_ + _) >> >> > // Print the first ten elements of each RDD generated in this >> DStream to >> >> > the >> >> > console >> >> > wordCounts.print() >> >> > ssc.start() >> >> > >> >> > This is what I am getting: >> >> > >> >> > >> >> > scala> ------------------------------------------- >> >> > Time: 1459541760000 ms >> >> > ------------------------------------------- >> >> > >> >> > But no values >> >> > >> >> > Have I got the port wrong in this case or the set up is incorrect? >> >> > >> >> > >> >> > Thanks >> >> > >> >> > Dr Mich Talebzadeh >> >> > >> >> > >> >> > >> >> > LinkedIn >> >> > >> >> > >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> > >> >> > >> >> > >> >> > http://talebzadehmich.wordpress.com >> >> > >> >> > >> > >> > >> > >