Hi, as far as I know, after the Streaming Context has started, the processing pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL statement is transformed into RDD operations when the Streaming Context starts, I think there is no way to change the statement that is executed on the current stream after the StreamingContext has started.
Tobias On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com <hsy...@gmail.com> wrote: > For example, this is what I tested and work on local mode, what it does is > it get data and sql query both from kafka and do sql on each RDD and output > the result back to kafka again > I defined a var called *sqlS. * In the streaming part as you can see I > change the sql statement if it consumes a sql message from kafka then next > time when you do *sql(sqlS) *it execute the updated sql query. > > But this code doesn't work in cluster because sqlS is not updated on all > the workers from what I understand. > > So my question is how do I change the sqlS value at runtime and make all > the workers pick the latest value. > > > *var sqlS = "select count(*) from records"* > val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = > args > val sparkConf = new SparkConf().setAppName("KafkaSpark") > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(2)) > val sqlContext = new SQLContext(sc) > > // Importing the SQL context gives access to all the SQL functions and > implicit conversions. > import sqlContext._ > import sqlContext.createSchemaRDD > > // val tt = Time(5000) > val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt, > sqltopic -> 2) > val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, > topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS = > t._2;* false } else true }).map(t => getRecord(t._2.split("#"))) > > val zkClient = new ZkClient(zkQuorum, 30000, 30000, ZKStringSerializer) > > val brokerString = > ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",") > > KafkaSpark.props.put("metadata.broker.list", brokerString) > val config = new ProducerConfig(KafkaSpark.props) > val producer = new Producer[String, String](config) > > val result = recordsStream.foreachRDD((recRDD) => { > val schemaRDD = sqlContext.createSchemaRDD(recRDD) > schemaRDD.registerAsTable(tName) > val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => { > s + r.mkString(",") + "\n" }) > producer.send(new KeyedMessage[String, String](outputTopic, s"SQL: > $sqlS \n $result")) > }) > ssc.start() > ssc.awaitTermination() > > > On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zonghen...@gmail.com> > wrote: > >> Can you paste a small code example to illustrate your questions? >> >> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime, >> how >> > do I broadcast the sql to all workers that is doing sql analysis. >> > >> > Best, >> > Siyuan >> > >> > >> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zonghen...@gmail.com> >> wrote: >> >> >> >> Do you mean that the texts of the SQL queries being hardcoded in the >> >> code? What do you mean by "cannot shar the sql to all workers"? >> >> >> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com <hsy...@gmail.com> >> >> wrote: >> >> > Hi guys, >> >> > >> >> > I'm able to run some Spark SQL example but the sql is static in the >> >> > code. I >> >> > would like to know is there a way to read sql from somewhere else >> (shell >> >> > for >> >> > example) >> >> > >> >> > I could read sql statement from kafka/zookeeper, but I cannot share >> the >> >> > sql >> >> > to all workers. broadcast seems not working for updating values. >> >> > >> >> > Moreover if I use some non-serializable class(DataInputStream etc) to >> >> > read >> >> > sql from other source, I always get "Task not serializable: >> >> > java.io.NotSerializableException" >> >> > >> >> > >> >> > Best, >> >> > Siyuan >> > >> > >> > >