Ajay,

Afaik Generally these contexts cannot be accessed within loops. The sql
query itself would run on distributed datasets so it's a parallel
execution. Putting them in foreach would make it nested in nested. So
serialization would become hard. Not sure I could explain it right.

If you can create the dataframe in main, you can register it as a table and
run the queries in main method itself. You don't need to coalesce or run
the method within foreach.

Regards
Sunita

On Tuesday, October 25, 2016, Ajay Chander <itsche...@gmail.com> wrote:

>
> Jeff, Thanks for your response. I see below error in the logs. You think
> it has to do anything with hiveContext ? Do I have to serialize it before
> using inside foreach ?
>
> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
> threw an exception
> java.lang.NullPointerException
>         at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(
> SQLListener.scala:167)
>         at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven
> t(SparkListenerBus.scala:42)
>         at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
>         at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
>         at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu
> s.scala:55)
>         at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn
> chronousListenerBus.scala:37)
>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchro
> nousListenerBus.scala:80)
>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
>         at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca
> la:1181)
>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(
> AsynchronousListenerBus.scalnerBus.scala:63)
>
> Thanks,
> Ajay
>
> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com
> <javascript:_e(%7B%7D,'cvml','zjf...@gmail.com');>> wrote:
>
>>
>> In your sample code, you can use hiveContext in the foreach as it is
>> scala List foreach operation which runs in driver side. But you cannot use
>> hiveContext in RDD.foreach
>>
>>
>>
>> Ajay Chander <itsche...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','itsche...@gmail.com');>>于2016年10月26日周三
>> 上午11:28写道:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>>     val conf = new SparkConf()
>>>     val sc = new SparkContext(conf)
>>>     val hiveContext = new HiveContext(sc)
>>>
>>>     val dataElementsFile = args(0)
>>>     val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>>     def calculate(de: Row) {
>>>       val dataElement = de.getAs[String]("DataElement").trim
>>>       val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>       df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>>     }
>>>
>>>     deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>
>

Reply via email to