Jeff, Thanks for your response. I see below error in the logs. You think it
has to do anything with hiveContext ? Do I have to serialize it before
using inside foreach ?

16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
threw an exception
java.lang.NullPointerException
        at org.apache.spark.sql.execution.ui.SQLListener.
onTaskEnd(SQLListener.scala:167)
        at org.apache.spark.scheduler.SparkListenerBus$class.
onPostEvent(SparkListenerBus.scala:42)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
        at org.apache.spark.util.ListenerBus$class.postToAll(
ListenerBus.scala:55)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(
AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(
AsynchronousListenerBus.scala:80)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.
scala:1181)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$
1.run(AsynchronousListenerBus.scalnerBus.scala:63)

Thanks,
Ajay

On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com> wrote:

>
> In your sample code, you can use hiveContext in the foreach as it is scala
> List foreach operation which runs in driver side. But you cannot use
> hiveContext in RDD.foreach
>
>
>
> Ajay Chander <itsche...@gmail.com>于2016年10月26日周三 上午11:28写道:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>>     val conf = new SparkConf()
>>     val sc = new SparkContext(conf)
>>     val hiveContext = new HiveContext(sc)
>>
>>     val dataElementsFile = args(0)
>>     val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>>     def calculate(de: Row) {
>>       val dataElement = de.getAs[String]("DataElement").trim
>>       val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>       df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>     }
>>
>>     deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>

Reply via email to