Here is my attempt:

val sparkConf = new SparkConf().setAppName("LogCounter")
val ssc =  new StreamingContext(sparkConf, Seconds(2))

val sc = new SparkContext()
val geoData = sc.textFile("data/geoRegion.csv")
            .map(_.split(','))
            .map(line => (line(0), (line(1),line(2),line(3),line(4))))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))

This is very wrong.  I have a feeling I should be broadcasting geoData
instead of reading it in with each task (it's a 100MB file), but I'm not
sure where to put the code that maps from the .csv to the final geoData rdd.

Also I'm not sure if geoData is even defined correctly (maybe it should use
ssc instead of sc?).  Please advise.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to