yes I noticed that scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", "rhes564:9092", "newtopic", 1)
<console>:52: error: overloaded method value createStream with alternatives: (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum: String,groupId: String,topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String] <and> (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum: String,groupId: String,topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)] cannot be applied to (org.apache.spark.streaming.StreamingContext, String, String, String, Int) val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181", "rhes564:9092", "newtopic", 1) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 1 April 2016 at 22:25, Cody Koeninger <c...@koeninger.org> wrote: > You're not passing valid Scala values. rhes564:2181 without quotes > isn't a valid literal, newtopic isn't a list of strings, etc. > > On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > Thanks Cody. > > > > Can I use Receiver-based Approach here? > > > > I have created the topic newtopic as below > > > > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181 > > --replication-factor 1 --partitions 1 --topic newtopic > > > > > > This is basically what I am doing the Spark > > > > val lines = ssc.socketTextStream("rhes564", 2181) > > > > Which obviously not working > > > > This is what is suggested in the doc > > > > import org.apache.spark.streaming.kafka._ > > > > val kafkaStream = KafkaUtils.createStream(streamingContext, > > [ZK quorum], [consumer group id], [per-topic number of Kafka > partitions > > to consume]) > > > > * <zkQuorum> is a list of one or more zookeeper servers that make > quorum > > * <group> is the name of kafka consumer group > > * <topics> is a list of one or more kafka topics to consume from > > * <numThreads> is the number of threads the kafka consumer should use > > > > Now this comes back with error. onviously not passing parameters > correctly! > > > > scala> val kafkaStream = KafkaUtils.createStream(streamingContext, > > rhes564:2181, rhes564:9092, newtopic 1) > > <console>:1: error: identifier expected but integer literal found. > > val kafkaStream = KafkaUtils.createStream(streamingContext, > > rhes564:2181, rhes564:9092, newtopic 1) > > > > > > > > > > > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 1 April 2016 at 21:13, Cody Koeninger <c...@koeninger.org> wrote: > >> > >> It looks like you're using a plain socket stream to connect to a > >> zookeeper port, which won't work. > >> > >> Look at spark.apache.org/docs/latest/streaming-kafka-integration.html > >> > >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh > >> <mich.talebza...@gmail.com> wrote: > >> > > >> > Hi, > >> > > >> > I am just testing Spark streaming with Kafka. > >> > > >> > Basically I am broadcasting topic every minute to Host:port -> > >> > rhes564:2181. > >> > This is sending few lines through a shell script as follows: > >> > > >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh > >> > --broker-list > >> > rhes564:9092 --topic newtopic > >> > > >> > That works fine and I can see the messages in > >> > > >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 > >> > --topic > >> > newtopic > >> > > >> > Fri Apr 1 21:00:01 BST 2016 ======= Sending messages from rhes5 > >> > > >> > > 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101 > >> > > >> > > 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102 > >> > > >> > > 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103 > >> > > >> > > 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104 > >> > > >> > > 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105 > >> > > >> > Now I try to see the topic in spark streaming as follows: > >> > > >> > val conf = new SparkConf(). > >> > setAppName("StreamTest"). > >> > setMaster("local[12]"). > >> > set("spark.driver.allowMultipleContexts", "true"). > >> > set("spark.hadoop.validateOutputSpecs", "false") > >> > val sc = new SparkContext(conf) > >> > // Create sqlContext based on HiveContext > >> > val sqlContext = new HiveContext(sc) > >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > >> > // > >> > // Create a local StreamingContext with two working thread and batch > >> > interval of 1 second. > >> > // The master requires 2 cores to prevent from a starvation scenario. > >> > val ssc = new StreamingContext(conf, Minutes(1)) > >> > // Create a DStream that will connect to hostname:port, like > >> > localhost:9999 > >> > //val lines = ssc.socketTextStream("rhes564", 9092) > >> > val lines = ssc.socketTextStream("rhes564", 2181) > >> > // Split each line into words > >> > val words = lines.flatMap(_.split(" ")) > >> > val pairs = words.map(word => (word, 1)) > >> > val wordCounts = pairs.reduceByKey(_ + _) > >> > // Print the first ten elements of each RDD generated in this DStream > to > >> > the > >> > console > >> > wordCounts.print() > >> > ssc.start() > >> > > >> > This is what I am getting: > >> > > >> > > >> > scala> ------------------------------------------- > >> > Time: 1459541760000 ms > >> > ------------------------------------------- > >> > > >> > But no values > >> > > >> > Have I got the port wrong in this case or the set up is incorrect? > >> > > >> > > >> > Thanks > >> > > >> > Dr Mich Talebzadeh > >> > > >> > > >> > > >> > LinkedIn > >> > > >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> > > >> > > >> > > >> > http://talebzadehmich.wordpress.com > >> > > >> > > > > > >