Hi all, I want to read some filtering rules from mysql (jdbc mysql driver) specifically its a char type containing a field and value to process in a kafka streaming input.
The main idea is to process this from a web UI (livy server). Any suggestion or guidelines? e.g., I have this: object Streaming { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args var spc = SparkContext.getOrCreate() val ssc = new StreamingContext(spc, Seconds(3)) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topics -> 5)).map(_._2) /* TEST MYSQL */ val sqlContext = new SQLContext(spc) val prop = new java.util.Properties val url = "jdbc:mysql://52.22.38.81:3306/tmp" val tbl_users = "santander_demo_users" val tbl_rules = "santander_demo_filters" val tbl_campaigns = "santander_demo_campaigns" prop.setProperty("user", "root") prop.setProperty("password", "Exalitica2014") val users = sqlContext.read.jdbc(url, tbl_users, prop) val rules = sqlContext.read.jdbc(url, tbl_rules, prop) val campaigns = sqlContext.read.jdbc(url, tbl_campaigns, prop) val toolbox = currentMirror.mkToolBox() val toRemove = "\"”.toSet var mto = “0" def rule_apply (n:Int, t:String, rules:DataFrame) : String = { // reading rules from mysql var r = (rules.filter(rules("CID") === n).select("FILTER_DSC").first())(0).toString() // using mkToolbox for pre-processing rules return toolbox.eval(toolbox.parse(""" val mto = """ + t + """ if(""" + r + """) { return “true" } else { return “false" } """)).toString() } /* TEST MYSQL */ lines.map{x => if(x.split(",").length > 1) { // reading from kafka input mto = spc.broadcast(x.split(",")(5).filterNot(toRemove)) } } var msg = rule_apply(1, mto, rules) var word = lines.map(x => msg) word.print() ssc.start() ssc.awaitTermination() } } The problem is that mto variable always returns to “0” value after mapping lines DStream. I tried to process rule_apply into map but I get not serializable mkToolbox class error. Thanks in advance. Franco Barrientos Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 franco.barrien...@exalitica.com www.exalitica.com