I currently writing an application that uses spark streaming.  What I am
trying to do is basically read in a few files (I do this by using the spark
context textFile) and then process those files inside an action that I apply
to a streaming RDD.  Here is the main code below:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("EmailIngestion")
  val ssc = new StreamingContext(sparkConf, Seconds(1)) 
  val sc = new SparkContext(sparkConf)
  val badWords = sc.textFile("/filters/badwords.txt")
  val urlBlacklist = sc.textFile("/filters/source_url_blacklist.txt")
  val domainBlacklist = sc.textFile("/filters/domain_blacklist.txt")
  val emailBlacklist = sc.textFile("/filters/blacklist.txt")

                
  val lines = FlumeUtils.createStream(ssc, "localhost", 4545,
StorageLevel.MEMORY_ONLY_SER_2) 
                
  lines.foreachRDD(rdd => rdd.foreachPartition(json =>
Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist,
emailBlacklist)))
  ssc.start()
  ssc.awaitTermination()   
} 

Here is the code for processing the files found inside the ProcessRecord
method:
val emailBlacklistCnt = emailBlacklist.filter(black =>
black.contains(email)).count

It looks like this throws an exception.  Is it possible to do this?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to