Have you tried using the console consumer to see if anything is actually
getting published to that topic?

On Tue, Aug 4, 2015 at 11:45 AM, narendra <narencs...@gmail.com> wrote:

> My application takes Twitter4j tweets and publishes those to a topic in
> Kafka. Spark Streaming subscribes to that topic for processing. But in
> actual, Spark Streaming is not able to receive tweet data from Kafka so
> Spark Streaming is running empty batch jobs with out input and I am not
> able
> to see any output from Spark Streaming.
>
> The code of the application is -
>
> import java.util.HashMap
> import java.util.Properties
> import twitter4j._
> import twitter4j.FilterQuery;
> import twitter4j.StallWarning;
> import twitter4j.Status;
> import twitter4j.StatusDeletionNotice;
> import twitter4j.StatusListener;
> import twitter4j.TwitterStream;
> import twitter4j.TwitterStreamFactory;
> import twitter4j.conf.ConfigurationBuilder;
> import twitter4j.json.DataObjectFactory;
> import kafka.serializer.StringDecoder
> import org.apache.spark.streaming.kafka._
> import kafka.javaapi.producer.Producer
> import kafka.producer.{KeyedMessage, ProducerConfig}
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
>
> object TwitterPopularTags {
>     def main(args: Array[String]) {
>
>             /** Information necessary for accessing the Twitter API */
>         val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
>         val consumerSecret=
> "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
>         val accessToken=
> "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
>         val accessTokenSecret =
> "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
>         val cb = new ConfigurationBuilder()
>         cb.setOAuthConsumerKey(consumerKey)
>         cb.setOAuthConsumerSecret(consumerSecret)
>         cb.setOAuthAccessToken(accessToken)
>         cb.setOAuthAccessTokenSecret(accessTokenSecret)
>         cb.setJSONStoreEnabled(true)
>         cb.setIncludeEntitiesEnabled(true)
>         val twitterStream = new
> TwitterStreamFactory(cb.build()).getInstance()
>
>         val KafkaTopic = "LiveTweets"
>         /* kafka producer properties */
>         val kafkaProducer = {
>                         val props = new Properties()
>                         props.put("metadata.broker.list",
> "broker2:9092,localhost:9092")
>                         props.put("serializer.class",
> "kafka.serializer.StringEncoder")
>                         props.put("request.required.acks", "1")
>                         val config = new ProducerConfig(props)
>                         new Producer[String, String](config)
>                      }
>
>                 /* Invoked when a new tweet comes */
>         val listener = new StatusListener() {
>
>                            override def onStatus(status: Status): Unit = {
>                                val msg = new KeyedMessage[String,
> String](KafkaTopic,DataObjectFactory.getRawJSON(status))
>                                kafkaProducer.send(msg)
>               }
>                    override def onException(ex: Exception): Unit = throw ex
>
>                   // no-op for the following events
>                   override def onStallWarning(warning: StallWarning): Unit
> =
> {}
>                   override def onDeletionNotice(statusDeletionNotice:
> StatusDeletionNotice): Unit = {}
>                   override def onScrubGeo(userId: Long, upToStatusId:
> Long):
> Unit = {}
>                   override def
> onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
>         }
>
>         twitterStream.addListener(listener)
>         // Create Spark Streaming context
>         val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
> Streaming")
>         val sc = new SparkContext(sparkConf)
>         val ssc = new StreamingContext(sc, Seconds(2))
>
>         // Define the Kafka parameters, broker list must be specified
>         val kafkaParams = Map("metadata.broker.list" ->
> "broker2:9092,localhost:9092")
>         val topics = Set(KafkaTopic)
>
>         // Create the direct stream with the Kafka parameters and topics
>         val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc,kafkaParams,topics)
>         val lines = kafkaStream.map(_._2)
>         val words = lines.flatMap(_.split(" "))
>         val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>         wordCounts.print()
>         ssc.start()
>         ssc.awaitTermination()
>
>   }
> }
>
> Spark Streaming web UI -
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png
> >
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png
> >
>
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to