Here is my attempt: val sparkConf = new SparkConf().setAppName("LogCounter") val ssc = new StreamingContext(sparkConf, Seconds(2))
val sc = new SparkContext() val geoData = sc.textFile("data/geoRegion.csv") .map(_.split(',')) .map(line => (line(0), (line(1),line(2),line(3),line(4)))) val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData)) This is very wrong. I have a feeling I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that maps from the .csv to the final geoData rdd. Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). Please advise. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org