In case it helps: I got around it temporarily by saving and reseting the context class loader around creating HiveContext. On Jul 2, 2015 4:36 AM, "Terry Hole" <hujie.ea...@gmail.com> wrote:
> Found this a bug in spark 1.4.0: SPARK-8368 > <https://issues.apache.org/jira/browse/SPARK-8368> > > Thanks! > Terry > > On Thu, Jul 2, 2015 at 1:20 PM, Terry Hole <hujie.ea...@gmail.com> wrote: > >> All, >> >> I am using spark console 1.4.0 to do some tests, when a create a newly >> HiveContext (Line 18 in the code) in my test function, it always throw >> exception like below (It works in spark console 1.3.0), but if i removed >> the HiveContext (The line 18 in the code) in my function, it works fine. >> Any idea what's wrong with this? >> >> java.lang.ClassNotFoundException: >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ >> iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1 >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at >> java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at >> org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos >> ureCleaner.scala:455) >> at >> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown >> Source) >> at >> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown >> Source) >> at >> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) >> at >> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) >> at >> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) >> at >> org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629) >> at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC >> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(<console>:98) >> at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC >> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:93) >> at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC >> $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:98) >> >> >> >> 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 >> import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time >> } 4 import org.apache.spark.streaming.StreamingContext._ 5 import >> org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream >> 7 import org.apache.spark.HashPartitioner 8 import >> org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 >> import org.apache.spark.sql.hive._11 import >> scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import >> scala.concurrent.ExecutionContext.Implicits.global14 15 def >> streamingTest(args: Array[String]) {16 println(">>> create >> streamingContext.")17 val ssc = new StreamingContext(sc, Seconds(1))18 >> *val sqlContext2 = new HiveContext(sc)*19 20 val accum = >> sc.accumulator(0, "End Accumulator")21 val queue = >> scala.collection.mutable.Queue(sc.textFile("G:/pipe/source"))22 val >> textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd => >> { rdd.foreach( item => {accum += 1} ) })24 textSource.foreachRDD(rdd => >> {25 var sample = rdd.take(10)26 if >> (sample.length > 0) {27 sample.foreach(item => >> println("#= " + item))28 }29 })30 >> println(">>> Start streaming context.")31 ssc.start()32 val stopFunc >> = Future {var isRun = true; var duration = 0; while (isRun) { >> Thread.sleep(1000); duration += 1; if ( accum.value > 0 || duration >= 120) >> {println("### STOP SSC ###");ssc.stop(false, true); duration = 0; isRun = >> false} }}33 ssc.awaitTermination()34 println(">>> Streaming context >> terminated.")35 }36 37 streamingTest(null)38 >> >> Thanks >> Terry >> > >