Similar bug reported before ZEPPELIN-108 <https://issues.apache.org/jira/browse/ZEPPELIN-108>
On 8 September 2015 at 14:33, Sajeevan Achuthan <achuthan.sajee...@gmail.com > wrote: > Hi Todd, > Thanks for the quick reply. I tried that option too and I go the error > below. Any idea? > > <console>:102: error: overloaded method constructor StreamingContext with > alternatives: (path: String,sparkContext: > org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext)org.apache.spark.streaming.StreamingContext > <and> (path: String,hadoopConf: > org.apache.hadoop.conf.Configuration)org.apache.spark.streaming.StreamingContext > <and> (conf: org.apache.spark.SparkConf,batchDuration: > org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext > <and> (sparkContext: > org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext,batchDuration: > org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext > cannot be applied to > (org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext, > org.apache.spark.streaming.Duration) val ssc = new StreamingContext(sc, > Milliseconds(2000)) > > On 8 September 2015 at 13:53, Todd Nist <tsind...@gmail.com> wrote: > >> You are passing a new SparkConf to the StreamingContext, which will cause >> the creation of a new SparkContext: >> >> *StreamingContext(conf: **SparkConf* >> <https://spark.apache.org/docs/1.4.1/api/scala/org/apache/spark/SparkConf.html> >> *, batchDuration: **Duration* >> <https://spark.apache.org/docs/1.4.1/api/scala/org/apache/spark/streaming/Duration.html> >> *)* >> >> Create a StreamingContext by providing the configuration necessary for a >> new SparkContext. >> >> Is there a reason you can not use the existing SparkContext created by >> Zeppelin? Then you can just do something like: >> >> val ssc = new StreamingContext(sc, Milliseconds( >> SparkStreamingBatchInterval)) >> >> ssc.checkpoint(SparkCheckpointDir) >> >> ... >> >> Where "sc" is the Zeppelin provided SparkContext. >> >> -Todd >> >> >> >> On Tue, Sep 8, 2015 at 8:11 AM, Sajeevan Achuthan < >> achuthan.sajee...@gmail.com> wrote: >> >>> Hi >>> The problem is the Spark is allowing to create two contexts, See the >>> log below. Could you please let me know , how to fix this problem? >>> >>> WARN [2015-09-08 13:09:01,191] ({pool-2-thread-5} >>> Logging.scala[logWarning]:92) - Multiple running SparkContexts detected in >>> the same JVM! >>> org.apache.spark.SparkException: Only one SparkContext may be running in >>> this JVM (see SPARK-2243). To ignore this error, set >>> spark.driver.allowMultipleContexts = true. The currently running >>> SparkContext was created at: >>> org.apache.spark.SparkContext.<init>(SparkContext.scala:81) >>> >>> org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:301) >>> >>> org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:146) >>> >>> org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:423) >>> >>> org.apache.zeppelin.interpreter.ClassloaderInterpreter.open(ClassloaderInterpreter.java:73) >>> >>> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:68) >>> >>> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:92) >>> >>> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:277) >>> org.apache.zeppelin.scheduler.Job.run(Job.java:170) >>> org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> at >>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2083) >>> at >>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2065) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2065) >>> at >>> org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:2151) >>> at org.apache.spark.SparkContext.<init>(SparkContext.scala:2023) >>> at >>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:834) >>> at >>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:80) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56) >>> at >>> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58) >>> at $line58.$ >>> /Saj >>> >>> On 8 September 2015 at 12:25, Todd Nist <tsind...@gmail.com> wrote: >>> >>>> I do not see that your importing the following: >>>> >>>> import org.apache.spark.sql._ >>>> >>>> Which I believe is where you will find the DataFrame.toDF function. >>>> >>>> HTH. >>>> >>>> -Todd >>>> >>>> On Mon, Sep 7, 2015 at 5:49 PM, Sajeevan Achuthan < >>>> achuthan.sajee...@gmail.com> wrote: >>>> >>>>> Hi Moon, >>>>> Thanks for the reply, I tried that option too. Unfortunately, I >>>>> tried that option too and I got the error >>>>> data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] = >>>>> org.apache.spark.streaming.dstream.MappedDStream@5f3ea8bb >>>>> <console>:49: error: value toDF is not a member of >>>>> org.apache.spark.rdd.RDD[CELL_KPIS] >>>>> accessLogs.toDF.registerTempTable("RAS") ^ >>>>> Any idea? >>>>> >>>>> On 7 September 2015 at 17:30, moon soo Lee <m...@apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I think you will need to convert RDD to data frame using .toDF(), like >>>>>> accessLogs.toDF.registerTempTable("RAS") >>>>>> >>>>>> Thanks, >>>>>> moon >>>>>> >>>>>> On Mon, Sep 7, 2015 at 3:34 AM Sajeevan Achuthan < >>>>>> achuthan.sajee...@gmail.com> wrote: >>>>>> >>>>>>> Zeppelin, an excellent tool. I am trying to implement a streaming >>>>>>> application. I get an error while deploying my application. See my code >>>>>>> below >>>>>>> >>>>>>> >>>>>>> import org.apache.spark.SparkContext >>>>>>> import org.apache.spark.SparkContext._ >>>>>>> import org.apache.spark.SparkConf >>>>>>> import org.apache.spark.streaming.StreamingContext >>>>>>> import org.apache.spark.streaming.Seconds >>>>>>> import org.apache.spark.sql.SQLContext >>>>>>> val sparkConf = new >>>>>>> SparkConf().setAppName("PEPA").setMaster("local[*]").set("spark.driver.allowMultipleContexts", >>>>>>> "true") >>>>>>> >>>>>>> import org.apache.spark.streaming.kafka._ >>>>>>> val ssc = new StreamingContext(sparkConf, Seconds(2)) >>>>>>> >>>>>>> ssc.checkpoint("checkpoint") >>>>>>> val topicMap = Map("incoming"->1) >>>>>>> >>>>>>> val record = KafkaUtils.createStream(ssc, "localhost", "1", >>>>>>> topicMap).map(_._2) >>>>>>> record.print() >>>>>>> case class >>>>>>> CELL_KPIS(ECELL_Name:String,CGI:String,Number_of_Times_Interf:Double,TAOF:Double,PHL:Double,NPCCHL:Double,LRSRP:Double,NC:Double) >>>>>>> val data = >>>>>>> record.map(s=>s.split(",")).filter(s=>s(0)!="\"ECELL_Name\"").map( >>>>>>> s=>CELL_KPIS(s(0), s(1), s(2).toDouble, s(3).toDouble, >>>>>>> s(5).toDouble,s(6).toDouble, s(7).toDouble, s(8).toDouble) >>>>>>> ) >>>>>>> data.foreachRDD {accessLogs => >>>>>>> import sqlContext.implicits._ >>>>>>> accessLogs.registerTempTable("RAS") >>>>>>> >>>>>>> } >>>>>>> ssc.start() >>>>>>> ssc.awaitTermination() >>>>>>> >>>>>>> And I get error >>>>>>> import org.apache.spark.SparkContext import >>>>>>> org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import >>>>>>> org.apache.spark.streaming.StreamingContext import >>>>>>> org.apache.spark.streaming.Seconds import >>>>>>> org.apache.spark.sql.SQLContext >>>>>>> sparkConf: org.apache.spark.SparkConf = >>>>>>> org.apache.spark.SparkConf@2e5779a >>>>>>> import org.apache.spark.streaming.kafka._ ssc: >>>>>>> org.apache.spark.streaming.StreamingContext = >>>>>>> org.apache.spark.streaming.StreamingContext@48621ee1 topicMap: >>>>>>> scala.collection.immutable.Map[String,Int] = Map(incoming -> 1) record: >>>>>>> org.apache.spark.streaming.dstream.DStream[String] = >>>>>>> org.apache.spark.streaming.dstream.MappedDStream@6290e75e defined >>>>>>> class CELL_KPIS data: >>>>>>> org.apache.spark.streaming.dstream.DStream[CELL_KPIS] >>>>>>> = org.apache.spark.streaming.dstream.MappedDStream@4bda38c3 >>>>>>> >>>>>>> <console>:55: error: value registerTempTable is not a member of >>>>>>> org.apache.spark.rdd.RDD[CELL_KPIS] accessLogs.registerTempTable("RAS") >>>>>>> >>>>>>> *My configuration for Zeppelin* >>>>>>> >>>>>>> >>>>>>> export MASTER=spark://localhost:7077 >>>>>>> export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_05 >>>>>>> export ZEPPELIN_PORT=9090 >>>>>>> export ZEPPELIN_SPARK_CONCURRENTSQL=false >>>>>>> export ZEPPELIN_SPARK_USEHIVECONTEXT=false >>>>>>> #'export MASTER=local[*] >>>>>>> export SPARK_HOME=/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4 >>>>>>> >>>>>>> *Interpreter configuration for spark * >>>>>>> >>>>>>> "2AW247KM7": { "id": "2AW247KM7", "name": "spark", "group": "spark", >>>>>>> "properties": { "spark.cores.max": "", "spark.yarn.jar": "", "master": >>>>>>> "local[*]", "zeppelin.spark.maxResult": "1000", >>>>>>> "zeppelin.dep.localrepo": >>>>>>> "local-repo", "spark.app.name": "APP3", "spark.executor.memory": >>>>>>> "5G", "zeppelin.spark.useHiveContext": "false", >>>>>>> "spark.driver.allowMultipleContexts": "true", "args": "", "spark.home": >>>>>>> "/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4", >>>>>>> "zeppelin.spark.concurrentSQL": "true", "zeppelin.pyspark.python": >>>>>>> "python" >>>>>>> }, "interpreterGroup": [ { "class": >>>>>>> "org.apache.zeppelin.spark.SparkInterpreter", "name": "spark" }, { >>>>>>> "class": >>>>>>> "org.apache.zeppelin.spark.PySparkInterpreter", "name": "pyspark" }, { >>>>>>> "class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "name": "sql" >>>>>>> }, >>>>>>> { "class": "org.apache.zeppelin.spark.DepInterpreter", "name": "dep" } >>>>>>> ], >>>>>>> "option": { "remote": true } } >>>>>>> Is there any problem in my code or setup ? >>>>>>> Any help very much appreciated. >>>>>>> >>>>>> >>>>> >>>> >>> >> >