Note: CCing user@spark.apache.org
First, you must check if the RDD is empty: messages.foreachRDD { rdd => if (!rdd.isEmpty) { ....}} Now, you can obtain the instance of a SQLContext: val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) *Optional* In this moment, I like work with DataFrame. I convert RDD to DataFrame. I see that you recive a JSON: val df :DataFrame = sqlContext.jsonRDD(message, getSchema(getSchemaStr)).toDF() My getSchema function create a Schema of my JSON: def getSchemaStr() :String = "feature1 feature2 ..." def getSchema(schema: String) :StructType = StructType (schema.split(" ").map(fieldName => StructField(fieldName, StringType, true))) I hope you helps. Regards. 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] < ml-node+s1001560n23226...@n3.nabble.com>: > I don't know why, you said “Why? I tried this solution and works fine.” > means your SQLContext instance alive all the streaming application’s life > time, rather than one bath duration ? My code as below: > > > object SQLContextSingleton extends java.io.Serializable{ > @transient private var instance: SQLContext = null > > // Instantiate SQLContext on demand > def getInstance(sparkContext: SparkContext): SQLContext = synchronized { > if (instance == null) { > instance = new SQLContext(sparkContext) > } > instance > } > } > > // type_->typex, id_->id, url_->url > case class dddd (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends > Serializable > case class Count(x: Int) > > @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) > ssc.checkpoint(".") > > val kafkaParams = Map("metadata.broker.list" -> "10.20.30.40:9092,") > @transient val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name")) > @transient val dddstream= newsIdDStream.map(x => x._2).flatMap(x => > x.split("\n")) > > dddstream.foreachRDD { rdd => > > SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable("ttable") > val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql("SELECT > COUNT(*) FROM ttable") > ret.foreach{ x => println(x(0)) } > } > > ssc.start() > ssc.awaitTermination() > > > > > > > 在 2015-06-09 17:41:44,"drarse [via Apache Spark User List]" <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=23226&i=0>> 写道: > > Why? I tried this solution and works fine. > > El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] > <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=23219&i=0>> escribió: > >> Hi drarse, thanks for replying, the way you said use a singleton object >> does not work >> >> >> >> >> 在 2015-06-09 16:24:25,"drarse [via Apache Spark User List]" <[hidden >> email] <http:///user/SendEmail.jtp?type=node&node=23218&i=0>> 写道: >> >> The best way is create a singleton object like: >> >> object SQLContextSingleton { >>> @transient private var instance: SQLContext = null >>> >>> // Instantiate SQLContext on demand >>> def getInstance(sparkContext: SparkContext): SQLContext = synchronized { >>> if (instance == null) { >>> instance = new SQLContext(sparkContext) >>> } >>> instance >>> }} >>> >>> You have more information in the programming guide: >> >> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations >> >> >> >> 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] <[hidden >> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=0>>: >> >>> I used SQLContext in a spark streaming application as blew: >>> >>> ---------------------------------------------------------------------------------------------------------------- >>> >>> case class topic_name (f1: Int, f2: Int) >>> >>> val sqlContext = new SQLContext(sc) >>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) >>> ssc.checkpoint(".") >>> val theDStream = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name")) >>> >>> theDStream.map(x => x._2).foreach { rdd => >>> sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name") >>> sqlContext.sql("select count(*) from topic_name").foreach { x => >>> WriteToFile("file_path", x(0).toString) >>> } >>> } >>> >>> ssc.start() >>> ssc.awaitTermination() >>> ---------------------------------------------------------------------------------------------------------------- >>> >>> >>> I found i could only get every 5 seconds's count of message, because >>> "The lifetime of this temporary table is tied to the SQLContext that was >>> used to create this DataFrame", i guess every 5 seconds, a new sqlContext >>> will be create and the temporary table can only alive just 5 seconds, i >>> want to the sqlContext and the temporary table alive all the streaming >>> application's life cycle, how to do it? >>> >>> Thanks~ >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the >>> discussion below: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html >>> To start a new topic under Apache Spark User List, email [hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=23216&i=1> >>> To unsubscribe from Apache Spark User List, click here. >>> NAML >>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html >> To unsubscribe from How to keep a SQLContext instance alive in a spark >> streaming application's life cycle?, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> >> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html >> To start a new topic under Apache Spark User List, email <a >> href="javascript:_e(%7B%7D,'cvml','ml-node%[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=23226&i=1>');" >> target="_blank">ml-node+s1001560n1h49@... >> To unsubscribe from Apache Spark User List, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > -- > Atte. Sergio Jiménez > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23219.html > To unsubscribe from How to keep a SQLContext instance alive in a spark > streaming application's life cycle?, click here. > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23226.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >