I currently writing an application that uses spark streaming. What I am trying to do is basically read in a few files (I do this by using the spark context textFile) and then process those files inside an action that I apply to a streaming RDD. Here is the main code below:
def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("EmailIngestion") val ssc = new StreamingContext(sparkConf, Seconds(1)) val sc = new SparkContext(sparkConf) val badWords = sc.textFile("/filters/badwords.txt") val urlBlacklist = sc.textFile("/filters/source_url_blacklist.txt") val domainBlacklist = sc.textFile("/filters/domain_blacklist.txt") val emailBlacklist = sc.textFile("/filters/blacklist.txt") val lines = FlumeUtils.createStream(ssc, "localhost", 4545, StorageLevel.MEMORY_ONLY_SER_2) lines.foreachRDD(rdd => rdd.foreachPartition(json => Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist, emailBlacklist))) ssc.start() ssc.awaitTermination() } Here is the code for processing the files found inside the ProcessRecord method: val emailBlacklistCnt = emailBlacklist.filter(black => black.contains(email)).count It looks like this throws an exception. Is it possible to do this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org