Sunita, Thanks for your time. In my scenario, based on each attribute from deDF(1 column with just 66 rows), I have to query a Hive table and insert into another table.
Thanks, Ajay On Wed, Oct 26, 2016 at 12:21 AM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Ajay, > > Afaik Generally these contexts cannot be accessed within loops. The sql > query itself would run on distributed datasets so it's a parallel > execution. Putting them in foreach would make it nested in nested. So > serialization would become hard. Not sure I could explain it right. > > If you can create the dataframe in main, you can register it as a table > and run the queries in main method itself. You don't need to coalesce or > run the method within foreach. > > Regards > Sunita > > On Tuesday, October 25, 2016, Ajay Chander <itsche...@gmail.com> wrote: > >> >> Jeff, Thanks for your response. I see below error in the logs. You think >> it has to do anything with hiveContext ? Do I have to serialize it before >> using inside foreach ? >> >> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener >> threw an exception >> java.lang.NullPointerException >> at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLL >> istener.scala:167) >> at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven >> t(SparkListenerBus.scala:42) >> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL >> istenerBus.scala:31) >> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL >> istenerBus.scala:31) >> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu >> s.scala:55) >> at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn >> chronousListenerBus.scala:37) >> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf >> un$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchronous >> ListenerBus.scala:80) >> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf >> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) >> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf >> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf >> un$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) >> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca >> la:1181) >> at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(As >> ynchronousListenerBus.scalnerBus.scala:63) >> >> Thanks, >> Ajay >> >> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> >>> In your sample code, you can use hiveContext in the foreach as it is >>> scala List foreach operation which runs in driver side. But you cannot use >>> hiveContext in RDD.foreach >>> >>> >>> >>> Ajay Chander <itsche...@gmail.com>于2016年10月26日周三 上午11:28写道: >>> >>>> Hi Everyone, >>>> >>>> I was thinking if I can use hiveContext inside foreach like below, >>>> >>>> object Test { >>>> def main(args: Array[String]): Unit = { >>>> >>>> val conf = new SparkConf() >>>> val sc = new SparkContext(conf) >>>> val hiveContext = new HiveContext(sc) >>>> >>>> val dataElementsFile = args(0) >>>> val deDF = >>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache() >>>> >>>> def calculate(de: Row) { >>>> val dataElement = de.getAs[String]("DataElement").trim >>>> val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + >>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM >>>> TEST_DB.TEST_TABLE1 ") >>>> df1.write.insertInto("TEST_DB.TEST_TABLE1") >>>> } >>>> >>>> deDF.collect().foreach(calculate) >>>> } >>>> } >>>> >>>> >>>> I looked at >>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext >>>> and I see it is extending SqlContext which extends Logging with >>>> Serializable. >>>> >>>> Can anyone tell me if this is the right way to use it ? Thanks for your >>>> time. >>>> >>>> Regards, >>>> >>>> Ajay >>>> >>>> >>