Note: CCing user@spark.apache.org

First, you must check if the RDD is empty:

         messages.foreachRDD { rdd =>
                                 if (!rdd.isEmpty) { ....}}

Now, you can obtain the instance of a SQLContext:

val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)




*Optional*
In this moment, I like work with DataFrame. I convert RDD to DataFrame. I
see that you recive a JSON:

val df :DataFrame = sqlContext.jsonRDD(message,
getSchema(getSchemaStr)).toDF()


My getSchema function create a Schema of my JSON:

def getSchemaStr() :String = "feature1 feature2 ..."

def getSchema(schema: String) :StructType = StructType (schema.split("
").map(fieldName => StructField(fieldName, StringType, true)))

I hope you helps.

Regards.



2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] <
ml-node+s1001560n23226...@n3.nabble.com>:

> I don't know why, you said “Why? I tried this solution and works fine.”
> means your SQLContext instance alive all the streaming application’s life
> time, rather than one bath duration ? My code as below:
>
>
> object SQLContextSingleton extends java.io.Serializable{
>   @transient private var instance: SQLContext = null
>
>   // Instantiate SQLContext on demand
>   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
>     if (instance == null) {
>       instance = new SQLContext(sparkContext)
>     }
>     instance
>   }
> }
>
> // type_->typex, id_->id, url_->url
> case class dddd (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends 
> Serializable
> case class Count(x: Int)
>
> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
> ssc.checkpoint(".")
>
> val kafkaParams = Map("metadata.broker.list" -> "10.20.30.40:9092,")
> @transient val dstream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
> @transient val dddstream= newsIdDStream.map(x => x._2).flatMap(x => 
> x.split("\n"))
>
> dddstream.foreachRDD { rdd =>
>     
> SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable("ttable")
>     val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql("SELECT 
> COUNT(*) FROM ttable")
>     ret.foreach{ x => println(x(0)) }
> }
>
> ssc.start()
> ssc.awaitTermination()
>
>
>
>
>
>
> 在 2015-06-09 17:41:44,"drarse [via Apache Spark User List]" <[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=23226&i=0>> 写道:
>
> Why? I  tried  this solution and works fine.
>
> El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
> <[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=23219&i=0>> escribió:
>
>> Hi drarse, thanks for replying, the way you said use a singleton object
>> does not work
>>
>>
>>
>>
>> 在 2015-06-09 16:24:25,"drarse [via Apache Spark User List]" <[hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=23218&i=0>> 写道:
>>
>> The best way is create a singleton object like:
>>
>> object SQLContextSingleton {
>>>   @transient private var instance: SQLContext = null
>>>
>>>   // Instantiate SQLContext on demand
>>>   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
>>>     if (instance == null) {
>>>       instance = new SQLContext(sparkContext)
>>>     }
>>>     instance
>>>   }}
>>>
>>>  You have more information in the programming guide:
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
>>
>>
>>
>> 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] <[hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=0>>:
>>
>>> I used SQLContext in a spark streaming application as blew:
>>>
>>> ----------------------------------------------------------------------------------------------------------------
>>>
>>> case class topic_name (f1: Int, f2: Int)
>>>
>>> val sqlContext = new SQLContext(sc)
>>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
>>> ssc.checkpoint(".")
>>> val theDStream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
>>>
>>> theDStream.map(x => x._2).foreach { rdd =>
>>>   sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
>>>   sqlContext.sql("select count(*) from topic_name").foreach { x =>
>>>     WriteToFile("file_path", x(0).toString)
>>>   }
>>> }
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>> ----------------------------------------------------------------------------------------------------------------
>>>
>>>
>>> I found i could only get every 5 seconds's count of message, because
>>> "The lifetime of this temporary table is tied to the SQLContext that was
>>> used to create this DataFrame", i guess every 5 seconds, a new sqlContext
>>> will be create and the temporary table can only alive just 5 seconds, i
>>> want to the sqlContext and the temporary table alive all the streaming
>>> application's life cycle, how to do it?
>>>
>>> Thanks~
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=23216&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html
>>  To unsubscribe from How to keep a SQLContext instance alive in a spark
>> streaming application's life cycle?, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>>
>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html
>>  To start a new topic under Apache Spark User List, email <a
>> href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ml-node%[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=23226&i=1>&#39;);"
>> target="_blank">ml-node+s1001560n1h49@...
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> --
> Atte. Sergio Jiménez
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23219.html
>  To unsubscribe from How to keep a SQLContext instance alive in a spark
> streaming application's life cycle?, click here.
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23226.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>

Reply via email to