Ajay, Afaik Generally these contexts cannot be accessed within loops. The sql query itself would run on distributed datasets so it's a parallel execution. Putting them in foreach would make it nested in nested. So serialization would become hard. Not sure I could explain it right.
If you can create the dataframe in main, you can register it as a table and run the queries in main method itself. You don't need to coalesce or run the method within foreach. Regards Sunita On Tuesday, October 25, 2016, Ajay Chander <itsche...@gmail.com> wrote: > > Jeff, Thanks for your response. I see below error in the logs. You think > it has to do anything with hiveContext ? Do I have to serialize it before > using inside foreach ? > > 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener > threw an exception > java.lang.NullPointerException > at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd( > SQLListener.scala:167) > at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven > t(SparkListenerBus.scala:42) > at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL > istenerBus.scala:31) > at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL > istenerBus.scala:31) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu > s.scala:55) > at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn > chronousListenerBus.scala:37) > at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$ > anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchro > nousListenerBus.scala:80) > at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$ > anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis > tenerBus.scala:65) > at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$ > anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis > tenerBus.scala:65) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$ > anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca > la:1181) > at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run( > AsynchronousListenerBus.scalnerBus.scala:63) > > Thanks, > Ajay > > On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com > <javascript:_e(%7B%7D,'cvml','zjf...@gmail.com');>> wrote: > >> >> In your sample code, you can use hiveContext in the foreach as it is >> scala List foreach operation which runs in driver side. But you cannot use >> hiveContext in RDD.foreach >> >> >> >> Ajay Chander <itsche...@gmail.com >> <javascript:_e(%7B%7D,'cvml','itsche...@gmail.com');>>于2016年10月26日周三 >> 上午11:28写道: >> >>> Hi Everyone, >>> >>> I was thinking if I can use hiveContext inside foreach like below, >>> >>> object Test { >>> def main(args: Array[String]): Unit = { >>> >>> val conf = new SparkConf() >>> val sc = new SparkContext(conf) >>> val hiveContext = new HiveContext(sc) >>> >>> val dataElementsFile = args(0) >>> val deDF = >>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache() >>> >>> def calculate(de: Row) { >>> val dataElement = de.getAs[String]("DataElement").trim >>> val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + >>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM >>> TEST_DB.TEST_TABLE1 ") >>> df1.write.insertInto("TEST_DB.TEST_TABLE1") >>> } >>> >>> deDF.collect().foreach(calculate) >>> } >>> } >>> >>> >>> I looked at >>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext >>> and I see it is extending SqlContext which extends Logging with >>> Serializable. >>> >>> Can anyone tell me if this is the right way to use it ? Thanks for your >>> time. >>> >>> Regards, >>> >>> Ajay >>> >>> >