Ok I managed to make this work. All I am interested is receiving messages from topic every minute. No filtering yet jut full text
import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils // val sparkConf = new SparkConf(). setAppName("StreamTest"). setMaster("local[12]"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val ssc = new StreamingContext(sparkConf, Seconds(60)) val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topic = Set("newtopic") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) messages.print() ssc.start() ------------------------------------------- Time: 1459554540000 ms ------------------------------------------- (null,Sat Apr 2 00:33:01 BST 2016 ======= Sending messages from rhes5) (null,1,'a7UkW5ZRaI_V8oRiPUNx0on6E06Ikr8_ILOxhVpgt6IoXXq2fF9ssYuJYcr49Cj4yp3nY9k8sHtIi_7XjltTVzqJ33beV2hIaqAj',101) (null,2,'dnFxOkOibbKLR5m3CIeS3rhwn8hCiaZAfEaD7yXi6M7jXcvaFYBjClLDoNMEVgfLZVgJ9tXchqlGX44FmvhnarLFrtJNbTb1C6j4',102) (null,3,'M9pvIOKMhaI_mSE3ExlovZWIxBE66KNEWGIGtCJF1qr_dGJX5sFKqLLa3Qv8aN2lCLi3lnGnMtqeZYBqE5YD586Vw50WWjL7ncZA',103) (null,4,'9EROPf_dJZpdAHmBubTRxEUkvC9S_Xnll5bWmX0xcOPk7l4TGXPgEqxpUP52QG6pUIn74mvwWqF9vzZ2ZhsmV6WPOmUAw4Ub_nFU',104) (null,5,'BLIi9a_n7Pfyc7r3nfzKfaNRa4Hmd9NlHEVDPkQS4xbgUWqU2bJeI6b8b1IMoStnmjMHhYLtFf4TQyJcpn85PSwFksggNVnQl1oL',105) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 1 April 2016 at 23:26, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I adopted this approach > > scala> val conf = new SparkConf(). > | setAppName("StreamTest"). > | setMaster("local[12]"). > | set("spark.driver.allowMultipleContexts", "true"). > | set("spark.hadoop.validateOutputSpecs", "false") > conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7 > scala> val ssc = new StreamingContext(conf, Seconds(60)) > ssc: org.apache.spark.streaming.StreamingContext = > org.apache.spark.streaming.StreamingContext@5dbae9eb > scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092") > kafkaParams: scala.collection.immutable.Map[String,String] = > Map(metadata.broker.list -> rhes564:9092) > scala> val topics = Set("newtopic") > topics: scala.collection.immutable.Set[String] = Set(newtopic) > scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics) > stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, > Nothing)] = > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21 > > So that opens data stream. What next? > > Thanks > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 1 April 2016 at 22:37, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> yes I noticed that >> >> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", >> "rhes564:9092", "newtopic", 1) >> >> <console>:52: error: overloaded method value createStream with >> alternatives: >> (jssc: >> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum: >> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel: >> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String] >> <and> >> (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum: >> String,groupId: String,topics: >> scala.collection.immutable.Map[String,Int],storageLevel: >> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, >> String)] >> cannot be applied to (org.apache.spark.streaming.StreamingContext, >> String, String, String, Int) >> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", >> "rhes564:9092", "newtopic", 1) >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 1 April 2016 at 22:25, Cody Koeninger <c...@koeninger.org> wrote: >> >>> You're not passing valid Scala values. rhes564:2181 without quotes >>> isn't a valid literal, newtopic isn't a list of strings, etc. >>> >>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh >>> <mich.talebza...@gmail.com> wrote: >>> > Thanks Cody. >>> > >>> > Can I use Receiver-based Approach here? >>> > >>> > I have created the topic newtopic as below >>> > >>> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181 >>> > --replication-factor 1 --partitions 1 --topic newtopic >>> > >>> > >>> > This is basically what I am doing the Spark >>> > >>> > val lines = ssc.socketTextStream("rhes564", 2181) >>> > >>> > Which obviously not working >>> > >>> > This is what is suggested in the doc >>> > >>> > import org.apache.spark.streaming.kafka._ >>> > >>> > val kafkaStream = KafkaUtils.createStream(streamingContext, >>> > [ZK quorum], [consumer group id], [per-topic number of Kafka >>> partitions >>> > to consume]) >>> > >>> > * <zkQuorum> is a list of one or more zookeeper servers that make >>> quorum >>> > * <group> is the name of kafka consumer group >>> > * <topics> is a list of one or more kafka topics to consume from >>> > * <numThreads> is the number of threads the kafka consumer should use >>> > >>> > Now this comes back with error. onviously not passing parameters >>> correctly! >>> > >>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext, >>> > rhes564:2181, rhes564:9092, newtopic 1) >>> > <console>:1: error: identifier expected but integer literal found. >>> > val kafkaStream = KafkaUtils.createStream(streamingContext, >>> > rhes564:2181, rhes564:9092, newtopic 1) >>> > >>> > >>> > >>> > >>> > >>> > >>> > Dr Mich Talebzadeh >>> > >>> > >>> > >>> > LinkedIn >>> > >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> > >>> > >>> > >>> > http://talebzadehmich.wordpress.com >>> > >>> > >>> > >>> > >>> > On 1 April 2016 at 21:13, Cody Koeninger <c...@koeninger.org> wrote: >>> >> >>> >> It looks like you're using a plain socket stream to connect to a >>> >> zookeeper port, which won't work. >>> >> >>> >> Look at >>> spark.apache.org/docs/latest/streaming-kafka-integration.html >>> >> >>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh >>> >> <mich.talebza...@gmail.com> wrote: >>> >> > >>> >> > Hi, >>> >> > >>> >> > I am just testing Spark streaming with Kafka. >>> >> > >>> >> > Basically I am broadcasting topic every minute to Host:port -> >>> >> > rhes564:2181. >>> >> > This is sending few lines through a shell script as follows: >>> >> > >>> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh >>> >> > --broker-list >>> >> > rhes564:9092 --topic newtopic >>> >> > >>> >> > That works fine and I can see the messages in >>> >> > >>> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 >>> >> > --topic >>> >> > newtopic >>> >> > >>> >> > Fri Apr 1 21:00:01 BST 2016 ======= Sending messages from rhes5 >>> >> > >>> >> > >>> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101 >>> >> > >>> >> > >>> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102 >>> >> > >>> >> > >>> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103 >>> >> > >>> >> > >>> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104 >>> >> > >>> >> > >>> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105 >>> >> > >>> >> > Now I try to see the topic in spark streaming as follows: >>> >> > >>> >> > val conf = new SparkConf(). >>> >> > setAppName("StreamTest"). >>> >> > setMaster("local[12]"). >>> >> > set("spark.driver.allowMultipleContexts", "true"). >>> >> > set("spark.hadoop.validateOutputSpecs", "false") >>> >> > val sc = new SparkContext(conf) >>> >> > // Create sqlContext based on HiveContext >>> >> > val sqlContext = new HiveContext(sc) >>> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> >> > // >>> >> > // Create a local StreamingContext with two working thread and batch >>> >> > interval of 1 second. >>> >> > // The master requires 2 cores to prevent from a starvation >>> scenario. >>> >> > val ssc = new StreamingContext(conf, Minutes(1)) >>> >> > // Create a DStream that will connect to hostname:port, like >>> >> > localhost:9999 >>> >> > //val lines = ssc.socketTextStream("rhes564", 9092) >>> >> > val lines = ssc.socketTextStream("rhes564", 2181) >>> >> > // Split each line into words >>> >> > val words = lines.flatMap(_.split(" ")) >>> >> > val pairs = words.map(word => (word, 1)) >>> >> > val wordCounts = pairs.reduceByKey(_ + _) >>> >> > // Print the first ten elements of each RDD generated in this >>> DStream to >>> >> > the >>> >> > console >>> >> > wordCounts.print() >>> >> > ssc.start() >>> >> > >>> >> > This is what I am getting: >>> >> > >>> >> > >>> >> > scala> ------------------------------------------- >>> >> > Time: 1459541760000 ms >>> >> > ------------------------------------------- >>> >> > >>> >> > But no values >>> >> > >>> >> > Have I got the port wrong in this case or the set up is incorrect? >>> >> > >>> >> > >>> >> > Thanks >>> >> > >>> >> > Dr Mich Talebzadeh >>> >> > >>> >> > >>> >> > >>> >> > LinkedIn >>> >> > >>> >> > >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> >> > >>> >> > >>> >> > >>> >> > http://talebzadehmich.wordpress.com >>> >> > >>> >> > >>> > >>> > >>> >> >> >