Have you tried using the console consumer to see if anything is actually getting published to that topic?
On Tue, Aug 4, 2015 at 11:45 AM, narendra <narencs...@gmail.com> wrote: > My application takes Twitter4j tweets and publishes those to a topic in > Kafka. Spark Streaming subscribes to that topic for processing. But in > actual, Spark Streaming is not able to receive tweet data from Kafka so > Spark Streaming is running empty batch jobs with out input and I am not > able > to see any output from Spark Streaming. > > The code of the application is - > > import java.util.HashMap > import java.util.Properties > import twitter4j._ > import twitter4j.FilterQuery; > import twitter4j.StallWarning; > import twitter4j.Status; > import twitter4j.StatusDeletionNotice; > import twitter4j.StatusListener; > import twitter4j.TwitterStream; > import twitter4j.TwitterStreamFactory; > import twitter4j.conf.ConfigurationBuilder; > import twitter4j.json.DataObjectFactory; > import kafka.serializer.StringDecoder > import org.apache.spark.streaming.kafka._ > import kafka.javaapi.producer.Producer > import kafka.producer.{KeyedMessage, ProducerConfig} > import org.apache.spark._ > import org.apache.spark.streaming._ > import org.apache.spark.streaming.StreamingContext._ > > object TwitterPopularTags { > def main(args: Array[String]) { > > /** Information necessary for accessing the Twitter API */ > val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ" > val consumerSecret= > "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl" > val accessToken= > "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e" > val accessTokenSecret = > "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm" > val cb = new ConfigurationBuilder() > cb.setOAuthConsumerKey(consumerKey) > cb.setOAuthConsumerSecret(consumerSecret) > cb.setOAuthAccessToken(accessToken) > cb.setOAuthAccessTokenSecret(accessTokenSecret) > cb.setJSONStoreEnabled(true) > cb.setIncludeEntitiesEnabled(true) > val twitterStream = new > TwitterStreamFactory(cb.build()).getInstance() > > val KafkaTopic = "LiveTweets" > /* kafka producer properties */ > val kafkaProducer = { > val props = new Properties() > props.put("metadata.broker.list", > "broker2:9092,localhost:9092") > props.put("serializer.class", > "kafka.serializer.StringEncoder") > props.put("request.required.acks", "1") > val config = new ProducerConfig(props) > new Producer[String, String](config) > } > > /* Invoked when a new tweet comes */ > val listener = new StatusListener() { > > override def onStatus(status: Status): Unit = { > val msg = new KeyedMessage[String, > String](KafkaTopic,DataObjectFactory.getRawJSON(status)) > kafkaProducer.send(msg) > } > override def onException(ex: Exception): Unit = throw ex > > // no-op for the following events > override def onStallWarning(warning: StallWarning): Unit > = > {} > override def onDeletionNotice(statusDeletionNotice: > StatusDeletionNotice): Unit = {} > override def onScrubGeo(userId: Long, upToStatusId: > Long): > Unit = {} > override def > onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {} > } > > twitterStream.addListener(listener) > // Create Spark Streaming context > val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark > Streaming") > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(2)) > > // Define the Kafka parameters, broker list must be specified > val kafkaParams = Map("metadata.broker.list" -> > "broker2:9092,localhost:9092") > val topics = Set(KafkaTopic) > > // Create the direct stream with the Kafka parameters and topics > val kafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc,kafkaParams,topics) > val lines = kafkaStream.map(_._2) > val words = lines.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > ssc.start() > ssc.awaitTermination() > > } > } > > Spark Streaming web UI - > > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png > > > > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png > > > > > Thank you. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >