Hi 

I have a question about Spark Twitter stream processing in Spark 1.3.1, the 
code sample below just opens 
up a twitter stream, uses auth keys, splits out has tags and creates a temp 
table. However, when I try to compile
it using sbt ( CentOS 6.5) I get the error 

[error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value 
toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]       val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF()

I know that I need to "import sqlContext.implicits._" which is what Ive tried 
but I still get the error. Can anyone advise ? 

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType}


object twitter1 {

  def main(args: Array[String]) {

    // create  a spark conf and context

    val appName = "Twitter example 1"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc = new SparkContext(conf)

    // set twitter auth key values

    val consumerKey         = "QQxxx"
    val consumerSecret     = "0HFxxx"
    val accessToken           = "32394xxx"
    val accessTokenSecret = "IlQvscxxx"

    // set twitter auth properties
    //   https://apps.twitter.com/

    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    val ssc    = new StreamingContext(sc, Seconds(5) )
    val stream = TwitterUtils.createStream(ssc,None)

    val hashTags = stream.flatMap( status => status.getText.split(" 
").filter(_.startsWith("#")))

  //  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._

    hashTags.foreachRDD{ rdd =>
      val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF()
      dfHashTags.registerTempTable("tweets")
    }

    // extra stuff here

    ssc.start()
    ssc.awaitTermination()

  } // end main

} // end twitter1

cheers

Mike F
                                                                                
                                                                          

Reply via email to