Re: Kafka + Spark streaming
Thanks TD. On Wed, Dec 31, 2014 at 7:19 AM, Tathagata Das tathagata.das1...@gmail.com wrote: 1. Of course, a single block / partition has many Kafka messages, and from different Kafka topics interleaved together. The message count is not related to the block count. Any message received within a particular block interval will go in the same block. 2. Yes, the receiver will be started on another worker. TD On Tue, Dec 30, 2014 at 2:19 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, Few general Queries : 1. Can a single block/partition in a RDD have more than 1 kafka message? or there will be one only one kafka message per block? In a more broader way, is the message count related to block in any way or its just that any message received with in a particular block interval will go in the same block. 2. If a worker goes down which runs the Receiver for Kafka, Will the receiver be restarted on some other worker? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-streaming-tp20914.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FlatMapValues
hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
pyspark.daemon not found
Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python: Please can somebody help me on this, how to resolve the issue. -Naveen
Re: How to set local property in beeline connect to the spark thrift server
Hi, Xiaoyu! You can use `spark.sql.thriftserver.scheduler.pool` instead of `spark.scheduler.pool` only in spark thrift server. On Wed, Dec 31, 2014 at 3:55 PM, Xiaoyu Wang wangxy...@gmail.com wrote: Hi all! I use Spark SQL1.2 start the thrift server on yarn. I want to use fair scheduler in the thrift server. I set the properties in spark-defaults.conf like this: spark.scheduler.mode FAIR spark.scheduler.allocation.file /opt/spark-1.2.0-bin-2.4.1/conf/fairscheduler.xml In the thrift server UI can see the scheduler pool is ok. [image: 内嵌图片 1] How can I specify one sql job to the test pool. By default the sql job run in the default pool. In the http://spark.apache.org/docs/latest/job-scheduling.html document I see sc.setLocalProperty(spark.scheduler.pool, pool1) can be set in the code. In the beeline I execute set spark.scheduler.pool=test, but no use. But how can I set the local property in the beeline?
spark stream + cassandra (execution on event)
Hi . I want to use spark streaming to read data from cassandra. But in my case I need process data based on event. (not retrieving the data constantly from Cassandra). Question: what is the way to issue the processing using spark streaming from time to time. Thanks Oleg.
Re: FlatMapValues
Why don't you push \n instead of \t in your first transformation [ (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t +fields(9)))] and then do saveAsTextFile? -Raghavendra On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
I am accessing hdfs with spark .textFile method. and I receive error as Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 here are my dependencies http://apache-spark-user-list.1001560.n3.nabble.com/file/n20925/Untitled.png Any suggestion ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-thread-main-org-apache-hadoop-ipc-RemoteException-Server-IPC-version-9-cannot-communica4-tp20925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
This generally means you have packaged Hadoop 1.x classes into your app accidentally. The most common cause is not marking Hadoop and Spark classes as provided dependencies. Your app doesn't need to ship its own copy of these classes when you use spark-submit. On Wed, Dec 31, 2014 at 10:47 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: I am accessing hdfs with spark .textFile method. and I receive error as Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 here are my dependencies http://apache-spark-user-list.1001560.n3.nabble.com/file/n20925/Untitled.png Any suggestion ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-thread-main-org-apache-hadoop-ipc-RemoteException-Server-IPC-version-9-cannot-communica4-tp20925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark.daemon not found
Hi, I am receiving the following error when I am trying to connect spark cluster( which is on unix) from my windows machine using pyspark interactive shell pyspark -master (spark cluster url) Then I executed the following commands. lines = sc.textFile(hdfs://master/data/spark/SINGLE.TXT) lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) I got the following Error 14/12/31 16:20:15 INFO DAGScheduler: Job 0 failed: reduce at stdin:1, took 6.960438 s Traceback (most recent call last): File stdin, line 1, in module File C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py, line 715, in reduce vals = self.mapPartitions(func).collect() File C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py, line 676, in collect bytesInJava = self._jrdd.collect().iterator() File C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line 538, in __call__ File C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o23.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-ins java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163) at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:102) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:265) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Please help me to resolve this issue. -Naveen From: Naveen Kumar Pokala [mailto:npok...@spcapitaliq.com] Sent: Wednesday, December 31, 2014 2:28 PM To: user@spark.apache.org Subject: pyspark.daemon not found Error from python worker: python:
NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream
Hi all, I am currently trying to combine datastax's spark-cassandra-connector and typesafe's akka-http-experimental on Spark 1.1.1 (spark-cassandra-connector for Spark 1.2.0 not out yet) and scala 2.10.4 I am using the hadoop 2.4 pre built package. (build.sbt file at the end) To solve the java.lang.NoClassDefFoundError: com/datastax/spark/connector/mapper/ColumnMapper and other NoClassDefFoundErrors, I have to give some jars to Spark (build.sbt is not enough). The connectors works fine. My spark submit looks like: sbt clean package; bin/spark-submit --class SimpleAppStreaming3 --master local[*] --jars spark-cassandra-connector_2.10-1.1.0.jar,cassandra-driver-core-2.1.3.jar,cassandra-thrift-2.0.5.jar,joda-time-2.6.jar target/scala-2.10/simple-project_2.10-1.0.jar Then I am trying to add some akka-http/akka-stream features. Like before I get a java.lang.NoClassDefFoundError: akka/stream/FlowMaterializer$ Same solution, I begin to add jars. Now my spark submit looks like: sbt clean package; bin/spark-submit --class impleAppStreaming3 --master local[*] --jars spark-cassandra-connector_2.10-1.1.0.jar,cassandra-driver-core-2.1.3.jar,cassandra-thrift-2.0.5.jar,joda-time-2.6.jar,akka-stream-experimental_2.10-1.0-M2.jar target/scala-2.10/simple-project_2.10-1.0.jar Then I have a new kind of error: Exception in thread main java.lang.NoSuchMethodError: com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J at akka.stream.StreamSubscriptionTimeoutSettings$.apply(FlowMaterializer.scala:256) at akka.stream.MaterializerSettings$.apply(FlowMaterializer.scala:185) at akka.stream.MaterializerSettings$.apply(FlowMaterializer.scala:172) at akka.stream.FlowMaterializer$$anonfun$1.apply(FlowMaterializer.scala:42) at akka.stream.FlowMaterializer$$anonfun$1.apply(FlowMaterializer.scala:42) at scala.Option.getOrElse(Option.scala:120) at akka.stream.FlowMaterializer$.apply(FlowMaterializer.scala:42) at SimpleAppStreaming3$.main(SimpleAppStreaming3.scala:240) at SimpleAppStreaming3.main(SimpleAppStreaming3.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I can't get rid of this error. I tried: 1) adding several jars (including config-1.2.1.jar) 2) studying the dependency tree (with https://github.com/jrudolph/sbt-dependency-graph) 3) excluding libraryDependencies (with dependencyOverrides) Any ideas? Bonus question: Is there a way to avoid adding all these jars with --jars? *My build.sbt file* name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.1 //exclude(com.typesafe, config) libraryDependencies += org.apache.spark %% spark-sql % 1.1.1 libraryDependencies += com.datastax.cassandra % cassandra-driver-core % 2.1.3 libraryDependencies += com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc() libraryDependencies += org.apache.cassandra % cassandra-thrift % 2.0.5 libraryDependencies += joda-time % joda-time % 2.6 libraryDependencies += com.typesafe.akka %% akka-actor % 2.3.8 libraryDependencies += com.typesafe.akka %% akka-testkit% 2.3.8 libraryDependencies += org.apache.hadoop % hadoop-client % 2.4.0 libraryDependencies += ch.qos.logback% logback-classic % 1.1.2 libraryDependencies += org.mockito % mockito-all % 1.10.17 libraryDependencies += org.scalatest %% scalatest % 2.2.3 libraryDependencies += org.slf4j % slf4j-api % 1.7.5 libraryDependencies += org.apache.spark %% spark-streaming % 1.1.1 libraryDependencies += com.typesafe.akka %% akka-stream-experimental % 1.0-M2 libraryDependencies += com.typesafe.akka %% akka-http-experimental % 1.0-M2 libraryDependencies += com.typesafe.akka %% akka-http-core-experimental % 1.0-M2 libraryDependencies += com.typesafe % config % 1.2.1 dependencyOverrides += com.typesafe % config % 1.2.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: building spark1.2 meet error
Hi J_soft, for me it is working, I didn't put -Dscala-2.10 -X parameters. I got only one warning, since I don't have hadoop 2.5 it didn't activate this profile: /larix@kovral:~/sources/spark-1.2.0 mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0 -DskipTests clean package Found 0 infos Finished in 3 ms [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 14.177 s] [INFO] Spark Project Networking ... SUCCESS [ 14.670 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.030 s] [INFO] Spark Project Core . SUCCESS [04:42 min] [INFO] Spark Project Bagel SUCCESS [ 26.184 s] [INFO] Spark Project GraphX ... SUCCESS [01:07 min] [INFO] Spark Project Streaming SUCCESS [01:35 min] [INFO] Spark Project Catalyst . SUCCESS [01:48 min] [INFO] Spark Project SQL .. SUCCESS [01:55 min] [INFO] Spark Project ML Library ... SUCCESS [02:17 min] [INFO] Spark Project Tools SUCCESS [ 15.527 s] [INFO] Spark Project Hive . SUCCESS [01:43 min] [INFO] Spark Project REPL . SUCCESS [ 45.154 s] [INFO] Spark Project YARN Parent POM .. SUCCESS [ 3.885 s] [INFO] Spark Project YARN Stable API .. SUCCESS [01:00 min] [INFO] Spark Project Assembly . SUCCESS [ 50.812 s] [INFO] Spark Project External Twitter . SUCCESS [ 21.401 s] [INFO] Spark Project External Flume Sink .. SUCCESS [ 25.207 s] [INFO] Spark Project External Flume ... SUCCESS [ 34.734 s] [INFO] Spark Project External MQTT SUCCESS [ 22.617 s] [INFO] Spark Project External ZeroMQ .. SUCCESS [ 22.444 s] [INFO] Spark Project External Kafka ... SUCCESS [ 33.566 s] [INFO] Spark Project Examples . SUCCESS [01:23 min] [INFO] Spark Project YARN Shuffle Service . SUCCESS [ 4.873 s] [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 23:20 min [INFO] Finished at: 2014-12-31T12:02:32+01:00 [INFO] Final Memory: 76M/855M [INFO] [WARNING] The requested profile hadoop-2.5 could not be activated because it does not exist./ If it won't work for you. I'd try to delete all sources, download source code once more and try again ... good luck, Tomas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FlatMapValues
Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) // Returns a pair (String, String), good } else { // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { (,) } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
Re: FlatMapValues
Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) // Returns a pair (String, String), good } else { // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(*','*)).map(fields = { *if *(fields.length = 11 !fields(0).contains(*VAERS_ID*)) { (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9))) } *else *{ (*,)* } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] *Sent:* 31 December 2014 13:42 *Cc:* user@spark.apache.org *Subject:* FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(*','*)).map(fields = { *if *(fields.length = 11 !fields(0).contains(*VAERS_ID*)) { (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9))) } *else *{ * *} }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile) thanks sanjay
Fwd: Sample Spark Program Error
Hi All, I am trying to run a sample Spark program using Scala SBT, Below is the program, def main(args: Array[String]) { val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be some file on your system val sc = new SparkContext(local, Simple App, E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } Below is the error log, 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 2032.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 (size: 2032.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, computing it 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on localhost (1/2) 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with curMem=36079, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 1912.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms on localhost (2/2) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 s 14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at Test1.scala:19, took 3.997365232 s 14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output partitions (allowLocal=false) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 267.2 MB) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, ANY, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2) 14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_0 locally 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1731 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, ANY, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3) 14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_1 locally 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1731 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 7 ms on localhost (1/2) 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 16 ms on localhost (2/2) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 1 (count at Test1.scala:20) finished in 0.016 s 14/12/30 23:20:21 INFO
Re: Fwd: Sample Spark Program Error
If you look at your program output closely, you can see the following output. Lines with a: 24, Lines with b: 15 The exception seems to be happening with Spark cleanup after executing your code. Try adding sc.stop() at the end of your program to see if the exception goes away. On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi All, I am trying to run a sample Spark program using Scala SBT, Below is the program, def main(args: Array[String]) { val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be some file on your system val sc = new SparkContext(local, Simple App, E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } Below is the error log, 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+67314/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, maxMem=28024897514/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 2032.0 B, free 267.2 MB)14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 (size: 2032.0 B, free: 267.3 MB)14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_014/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result sent to driver14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes)14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, computing it14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+67314/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on localhost (1/2)14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with curMem=36079, maxMem=28024897514/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 1912.0 B, free 267.2 MB)14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 B, free: 267.3 MB)14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_114/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms on localhost (2/2)14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 s14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at Test1.scala:19, took 3.997365232 s14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:2014/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output partitions (allowLocal=false)14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at Test1.scala:20)14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List()14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List()14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, maxMem=28024897514/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 267.2 MB)14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at Test1.scala:20)14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, ANY, 1264 bytes)14/12/30 23:20:21 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2)14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_0 locally14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1731 bytes result sent to driver14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, ANY, 1264 bytes)14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_1 locally14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1731 bytes
Re: FlatMapValues
Hey Kapil, Fernando Thanks for your mail. [1] Fernando, if I don't use an if logic inside the map then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the if is to safeguard against that. [2] Kapil, I am sorry I did not clarify. Yes my code DID NOT compile saying that flatMapValues is not defined. In fact when I used your snippet , the code still does not compile Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) ^ My pom.xml looks like this dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.2.0/version /dependency [3] To summarize all I want is to convert SUMMARY===when a dataset looks like the following 1,red,blue,green2,yellow,violet,pink I want to output the following and currently not able to 1,red1,blue1,green2,yellow2,violet2,pink thanks regards sanjay From: Fernando O. fot...@gmail.com To: Kapil Malik kma...@adobe.com Cc: Sanjay Subramanian sanjaysubraman...@yahoo.com; user@spark.apache.org user@spark.apache.org Sent: Wednesday, December 31, 2014 6:06 AM Subject: Re: FlatMapValues Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))// Returns a pair (String, String), good } else { // Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { (,) } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
Re: Fwd: Sample Spark Program Error
Yes. The exception is gone now after adding stop() at the end. Can you please tell me what this stop() does at the end. Does it disable the spark context. On Wed, Dec 31, 2014 at 10:09 AM, RK prk...@yahoo.com wrote: If you look at your program output closely, you can see the following output. Lines with a: 24, Lines with b: 15 The exception seems to be happening with Spark cleanup after executing your code. Try adding sc.stop() at the end of your program to see if the exception goes away. On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi All, I am trying to run a sample Spark program using Scala SBT, Below is the program, def main(args: Array[String]) { val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be some file on your system val sc = new SparkContext(local, Simple App, E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } Below is the error log, 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 2032.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 (size: 2032.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, computing it 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on localhost (1/2) 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with curMem=36079, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 1912.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms on localhost (2/2) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 s 14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at Test1.scala:19, took 3.997365232 s 14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output partitions (allowLocal=false) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 267.2 MB) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, ANY, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2) 14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_0 locally 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1731 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
Re: Long-running job cleanup
The previously submitted code doesn’t actually show the problem I was trying to show effectively since the issue becomes clear between subsequent steps. Within a single step it appears things are cleared up properly. Memory usage becomes evident pretty quickly. def showMemoryUsage(sc: SparkContext) = { val usersPerStep = 2500 val count = 100 val numSteps = count / usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s = (s, 2)).partitionBy(new HashPartitioner(200)).cache() val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 100).map(s = (s, 4)).repartition(1).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() val results = usersFiltered.map(user = { val userScore = userFeatures.lookup(user).head val recPerUser = Array(1,2,userScore) recPerUser }) val mapedResults: Array[Int] = results.flatMap(scores = scores).toArray log(State: Computed + mapedResults.length + predictions for stage + i) sc.parallelize(mapedResults) // Write to disk (left out since problem is evident even without it) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” within a single step memory is cleared properly: Free 441.1 MB Free 439.8 MB Free 439.8 MB Free 441.1 MB Free 441.1 MB Free 439.8 MB But between steps, the amount of available memory decreases (e.g. That range that things oscillate between shrinks) and over the course of many hours this eventually reduces to zero. Free 440.7 MB Free 438.7 MB Free 438.7 MB Free 440.7 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Free 426.7 MB Free 402.5 MB Free 402.5 MB Free 426.7 MB Free 426.7 MB Free 402.5 MB Free 402.5 MB Free 426.7 MB From: Ganelin, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Date: Tuesday, December 30, 2014 at 7:30 PM To: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com, Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Long-running job cleanup Hi Patrick, to follow up on the below discussion, I am including a short code snippet that produces the problem on 1.1. This is kind of stupid code since it’s a greatly simplified version of what I’m actually doing but it has a number of the key components in place. I’m also including some example log output. Thank you. def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com Date: Sunday, December 28, 2014 at 4:02 PM To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Long-running job cleanup Hi Patrick - is that cleanup present in 1.1? The overhead I am talking about is with regards to what I believe is shuffle related metadata. If I watch the execution log I see small broadcast variables created for every stage of execution, a few KB at a time,
RE: FlatMapValues
Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Fwd: Sample Spark Program Error
Hi Naveen, Quoting http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext SparkContext is Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one So stop ( ) shuts down the connection between Driver program and Spark master, and does some cleanup. Indeed, after calling this, you cannot do any operation on it or on any RDD created via this context. Regards, Kapil From: Naveen Madhire [mailto:vmadh...@umail.iu.edu] Sent: 31 December 2014 22:08 To: RK Cc: user@spark.apache.org Subject: Re: Fwd: Sample Spark Program Error Yes. The exception is gone now after adding stop() at the end. Can you please tell me what this stop() does at the end. Does it disable the spark context. On Wed, Dec 31, 2014 at 10:09 AM, RK prk...@yahoo.commailto:prk...@yahoo.com wrote: If you look at your program output closely, you can see the following output. Lines with a: 24, Lines with b: 15 The exception seems to be happening with Spark cleanup after executing your code. Try adding sc.stop() at the end of your program to see if the exception goes away. On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire vmadh...@umail.iu.edumailto:vmadh...@umail.iu.edu wrote: Hi All, I am trying to run a sample Spark program using Scala SBT, Below is the program, def main(args: Array[String]) { val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be some file on your system val sc = new SparkContext(local, Simple App, E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } Below is the error log, 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 2032.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 (size: 2032.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, computing it 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on localhost (1/2) 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with curMem=36079, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 1912.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms on localhost (2/2) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 s 14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at Test1.scala:19, took 3.997365232 s 14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output partitions (allowLocal=false) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, maxMem=280248975 14/12/30
Re: Big performance difference between client and cluster deployment mode; is this expected?
-dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Big performance difference between client and cluster deployment mode; is this expected?
Also the job was deployed from the master machine in the cluster. ᐧ On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote: Oh sorry that was a edit mistake. The code is essentially: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I.e. there is essentially no original code (I was calling saveAsTextFile in a save function but that was just a remnant from previous debugging). ᐧ On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote: -dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ
Re: Big performance difference between client and cluster deployment mode; is this expected?
Oh sorry that was a edit mistake. The code is essentially: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I.e. there is essentially no original code (I was calling saveAsTextFile in a save function but that was just a remnant from previous debugging). ᐧ On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote: -dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ
Re: building spark1.2 meet error
Hi, Where does the following path that appears in the logs below come from? /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar Did you somehow point at the local maven repository that's H:\Soft\Maven? Jacek 31 gru 2014 01:48 j_soft zsof...@gmail.com napisał(a): no,it still fail use mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0 -Dscala-2.10 -X -DskipTests clean package ... [DEBUG] /opt/xdsp/spark-1.2.0/core/src/main/scala [DEBUG] includes = [**/*.scala,**/*.java,] [DEBUG] excludes = [] [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [DEBUG] Setup = { [DEBUG]scala compiler = /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar [DEBUG]scala library = /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar [DEBUG]scala extra = { [DEBUG] /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.jar [DEBUG]} [DEBUG]sbt interface = /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/sbt-interface/0.13.5/sbt-interface-0.13.5.jar [DEBUG]compiler interface sources = /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/compiler-interface/0.13.5/compiler-interface-0.13.5-sources.jar [DEBUG]java home = [DEBUG]fork java = false [DEBUG]cache directory = /root/.zinc/0.3.5 [DEBUG] } [INFO] 'compiler-interface' not yet compiled for Scala 2.10.4. Compiling... [DEBUG] Plain interface to Scala compiler 2.10.4 with arguments: -nowarn -d /tmp/sbt_8b816650 -bootclasspath /opt/jdk1.7/jre/lib/resources.jar:/opt/jdk1.7/jre/lib/rt.jar:/opt/jdk1.7/jre/lib/sunrsasign.jar:/opt/jdk1.7/jre/lib/jsse.jar:/opt/jdk1.7/jre/lib/jce.jar:/opt/jdk1.7/jre/lib/charsets.jar:/opt/jdk1.7/jre/lib/jfr.jar:/opt/jdk1.7/jre/classes:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar -classpath /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/sbt-interface/0.13.5/sbt-interface-0.13.5.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/compiler-interface/0.13.5/compiler-interface-0.13.5-sources.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.jar /tmp/sbt_b9456a7b/xsbt/API.scala /tmp/sbt_b9456a7b/xsbt/Analyzer.scala /tmp/sbt_b9456a7b/xsbt/Command.scala /tmp/sbt_b9456a7b/xsbt/Compat.scala /tmp/sbt_b9456a7b/xsbt/CompilerInterface.scala /tmp/sbt_b9456a7b/xsbt/ConsoleInterface.scala /tmp/sbt_b9456a7b/xsbt/DelegatingReporter.scala /tmp/sbt_b9456a7b/xsbt/Dependency.scala /tmp/sbt_b9456a7b/xsbt/ExtractAPI.scala /tmp/sbt_b9456a7b/xsbt/ExtractUsedNames.scala /tmp/sbt_b9456a7b/xsbt/LocateClassFile.scala /tmp/sbt_b9456a7b/xsbt/Log.scala /tmp/sbt_b9456a7b/xsbt/Message.scala /tmp/sbt_b9456a7b/xsbt/ScaladocInterface.scala error: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.getPackage(Mirrors.scala:172) at scala.reflect.internal.Mirrors$RootsBase.getRequiredPackage(Mirrors.scala:175) at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage$lzycompute(Definitions.scala:183) at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage(Definitions.scala:183) at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass$lzycompute(Definitions.scala:184) at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass(Definitions.scala:184) at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr$lzycompute(Definitions.scala:1024) at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr(Definitions.scala:1023) at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses$lzycompute(Definitions.scala:1153) at
Re: Why the major.minor version of the new hive-exec is 51.0?
We actually do publish our own version of this jar, because the version that the hive team publishes is an uber jar and this breaks all kinds of things. As a result I'd file the JIRA against Spark. On Wed, Dec 31, 2014 at 12:55 PM, Ted Yu yuzhih...@gmail.com wrote: Michael: hive-exec-0.12.0-protobuf-2.5.jar is not generated from Spark source code, right ? What would be done after the JIRA is opened ? Cheers On Wed, Dec 31, 2014 at 12:16 PM, Michael Armbrust mich...@databricks.com wrote: This was not intended, can you open a JIRA? On Tue, Dec 30, 2014 at 8:40 PM, Ted Yu yuzhih...@gmail.com wrote: I extracted org/apache/hadoop/hive/common/CompressionUtils.class from the jar and used hexdump to view the class file. Bytes 6 and 7 are 00 and 33, respectively. According to http://en.wikipedia.org/wiki/Java_class_file, the jar was produced using Java 7. FYI On Tue, Dec 30, 2014 at 8:09 PM, Shixiong Zhu zsxw...@gmail.com wrote: The major.minor version of the new org.spark-project.hive.hive-exec is 51.0, so it will require people use JDK7. Is it intentional? dependency groupIdorg.spark-project.hive/groupId artifactIdhive-exec/artifactId version0.12.0-protobuf-2.5/version /dependency You can use the following steps to reproduce it (Need to use JDK6): 1. Create a Test.java file with the following content: public class Test { public static void main(String[] args) throws Exception{ Class.forName(org.apache.hadoop.hive.conf.HiveConf); } } 2. javac Test.java 3. java -classpath ~/.m2/repository/org/spark-project/hive/hive-exec/0.12.0-protobuf-2.5/hive-exec-0.12.0-protobuf-2.5.jar:. Test Exception in thread main java.lang.UnsupportedClassVersionError: org/apache/hadoop/hive/conf/HiveConf : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) at java.net.URLClassLoader.access$000(URLClassLoader.java:58) at java.net.URLClassLoader$1.run(URLClassLoader.java:197) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:169) at Test.main(Test.java:5) Best Regards, Shixiong Zhu
Re: Why the major.minor version of the new hive-exec is 51.0?
I see. I logged SPARK-5041 which references this thread. Thanks On Wed, Dec 31, 2014 at 12:57 PM, Michael Armbrust mich...@databricks.com wrote: We actually do publish our own version of this jar, because the version that the hive team publishes is an uber jar and this breaks all kinds of things. As a result I'd file the JIRA against Spark. On Wed, Dec 31, 2014 at 12:55 PM, Ted Yu yuzhih...@gmail.com wrote: Michael: hive-exec-0.12.0-protobuf-2.5.jar is not generated from Spark source code, right ? What would be done after the JIRA is opened ? Cheers On Wed, Dec 31, 2014 at 12:16 PM, Michael Armbrust mich...@databricks.com wrote: This was not intended, can you open a JIRA? On Tue, Dec 30, 2014 at 8:40 PM, Ted Yu yuzhih...@gmail.com wrote: I extracted org/apache/hadoop/hive/common/CompressionUtils.class from the jar and used hexdump to view the class file. Bytes 6 and 7 are 00 and 33, respectively. According to http://en.wikipedia.org/wiki/Java_class_file, the jar was produced using Java 7. FYI On Tue, Dec 30, 2014 at 8:09 PM, Shixiong Zhu zsxw...@gmail.com wrote: The major.minor version of the new org.spark-project.hive.hive-exec is 51.0, so it will require people use JDK7. Is it intentional? dependency groupIdorg.spark-project.hive/groupId artifactIdhive-exec/artifactId version0.12.0-protobuf-2.5/version /dependency You can use the following steps to reproduce it (Need to use JDK6): 1. Create a Test.java file with the following content: public class Test { public static void main(String[] args) throws Exception{ Class.forName(org.apache.hadoop.hive.conf.HiveConf); } } 2. javac Test.java 3. java -classpath ~/.m2/repository/org/spark-project/hive/hive-exec/0.12.0-protobuf-2.5/hive-exec-0.12.0-protobuf-2.5.jar:. Test Exception in thread main java.lang.UnsupportedClassVersionError: org/apache/hadoop/hive/conf/HiveConf : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) at java.net.URLClassLoader.access$000(URLClassLoader.java:58) at java.net.URLClassLoader$1.run(URLClassLoader.java:197) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:169) at Test.main(Test.java:5) Best Regards, Shixiong Zhu
Re: UpdateStateByKey persist to Tachyon
bumping this thread up -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-persist-to-Tachyon-tp20798p20930.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Big performance difference between client and cluster deployment mode; is this expected?
Whats your spark-submit commands in both cases? Is it Spark Standalone or YARN (both support client and cluster)? Accordingly what is the number of executors/cores requested? TD On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote: Also the job was deployed from the master machine in the cluster. ᐧ On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote: Oh sorry that was a edit mistake. The code is essentially: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I.e. there is essentially no original code (I was calling saveAsTextFile in a save function but that was just a remnant from previous debugging). ᐧ On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote: -dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ
limit vs sample for indexing a small amount of data quickly?
Is there a limit function which just returns the first N records? Sample is nice but I’m trying to do this so it’s super fast and just to test the functionality of an algorithm. With sample I’d have to compute the % that would yield 1000 results first… Kevin -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: limit vs sample for indexing a small amount of data quickly?
There's a take method that might do what you need: *def take(**num**: **Int**): Array[T]* Take the first num elements of the RDD. On Jan 1, 2015 12:02 AM, Kevin Burton bur...@spinn3r.com wrote: Is there a limit function which just returns the first N records? Sample is nice but I’m trying to do this so it’s super fast and just to test the functionality of an algorithm. With sample I’d have to compute the % that would yield 1000 results first… Kevin -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
NullPointerException
Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
Re: NullPointerException
Which version of Spark are you using? On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
spark ignoring all memory settings and defaulting to 512MB?
This is really weird and I’m surprised no one has found this issue yet. I’ve spent about an hour or more trying to debug this :-( My spark install is ignoring ALL my memory settings. And of course my job is running out of memory. The default is 512MB so pretty darn small. The worker and master start up and both use 512M This alone is very weird and poor documentation IMO because: SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)” … so if it’s giving it to executors, AKA the memory executors run with, then it should be SPARK_EXECUTOR_MEMORY… … and the worker actually uses SPARK_DAEMON memory. but actually I’m right. It IS SPARK_EXECUTOR_MEMORY… according to bin/spark-class … but, that’s not actually being used :-( that setting is just flat out begin ignored and it’s just using 512MB. So all my jobs fail. … and I write an ‘echo’ so I could trace the spark-class script to see what the daemons are actually being run with and spark-class wasn’t being called with and nothing is logged for the coarse grained executor. I guess it’s just inheriting the JVM opts from it’s parent and Java is launching the process directly? This is a nightmare :( -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: NullPointerException
spark-1.0.0 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote: Which version of Spark are you using? On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
Fwd: NullPointerException
-- Forwarded message -- From: rapelly kartheek kartheek.m...@gmail.com Date: Thu, Jan 1, 2015 at 12:05 PM Subject: Re: NullPointerException To: Josh Rosen rosenvi...@gmail.com, user@spark.apache.org spark-1.0.0 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote: Which version of Spark are you using? On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
Re: spark ignoring all memory settings and defaulting to 512MB?
wow. Just figured it out: conf.set( spark.executor.memory, 2g); I have to set it in the Job… that’s really counter intuitive. Especially because the documentation in spark-env.sh says the exact opposite. What’s the resolution here. This seems like a mess. I’d propose a solution to clean it up but I don’t know where to begin. On Wed, Dec 31, 2014 at 10:35 PM, Kevin Burton bur...@spinn3r.com wrote: This is really weird and I’m surprised no one has found this issue yet. I’ve spent about an hour or more trying to debug this :-( My spark install is ignoring ALL my memory settings. And of course my job is running out of memory. The default is 512MB so pretty darn small. The worker and master start up and both use 512M This alone is very weird and poor documentation IMO because: SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)” … so if it’s giving it to executors, AKA the memory executors run with, then it should be SPARK_EXECUTOR_MEMORY… … and the worker actually uses SPARK_DAEMON memory. but actually I’m right. It IS SPARK_EXECUTOR_MEMORY… according to bin/spark-class … but, that’s not actually being used :-( that setting is just flat out begin ignored and it’s just using 512MB. So all my jobs fail. … and I write an ‘echo’ so I could trace the spark-class script to see what the daemons are actually being run with and spark-class wasn’t being called with and nothing is logged for the coarse grained executor. I guess it’s just inheriting the JVM opts from it’s parent and Java is launching the process directly? This is a nightmare :( -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: NullPointerException
It looks like 'null' might be selected as a block replication peer? https://github.com/apache/spark/blob/v1.0.0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L786 I know that we fixed some replication bugs in newer versions of Spark (such as https://github.com/apache/spark/pull/2366), so it's possible that this issue would be resolved by updating. Can you try re-running your job with a newer Spark version to see whether you still see the same error? On Wed, Dec 31, 2014 at 10:35 PM, rapelly kartheek kartheek.m...@gmail.com wrote: spark-1.0.0 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote: Which version of Spark are you using? On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
Re: NullPointerException
Ok. Let me try out on a newer version. Thank you!! On Thu, Jan 1, 2015 at 12:17 PM, Josh Rosen rosenvi...@gmail.com wrote: It looks like 'null' might be selected as a block replication peer? https://github.com/apache/spark/blob/v1.0.0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L786 I know that we fixed some replication bugs in newer versions of Spark (such as https://github.com/apache/spark/pull/2366), so it's possible that this issue would be resolved by updating. Can you try re-running your job with a newer Spark version to see whether you still see the same error? On Wed, Dec 31, 2014 at 10:35 PM, rapelly kartheek kartheek.m...@gmail.com wrote: spark-1.0.0 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote: Which version of Spark are you using? On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I get this following Exception when I submit spark application that calculates the frequency of characters in a file. Especially, when I increase the size of data, I face this problem. Exception in thread Thread-47 org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:10 failed 4 times, most recent failure: Exception failure in TID 295 on host s1: java.lang.NullPointerException org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752) org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any help? Thank you!
Re: spark ignoring all memory settings and defaulting to 512MB?
Welcome to Spark. What's more fun is that setting controls memory on the executors but if you want to set memory limit on the driver you need to configure it as a parameter of the spark-submit script. You also set num-executors and executor-cores on the spark submit call. See both the Spark tuning guide and the Spark configuration page for more discussion of stuff like this. W.r.t. The spark memory option, my understanding is that parameter has been deprecated (the SPARK_EXE_MEM) and the documentation is probably stale. Good starting point for cleanup would probably be to update that :-). On Thu, Jan 1, 2015 at 1:45 AM Kevin Burton bur...@spinn3r.com wrote: wow. Just figured it out: conf.set( spark.executor.memory, 2g); I have to set it in the Job… that’s really counter intuitive. Especially because the documentation in spark-env.sh says the exact opposite. What’s the resolution here. This seems like a mess. I’d propose a solution to clean it up but I don’t know where to begin. On Wed, Dec 31, 2014 at 10:35 PM, Kevin Burton bur...@spinn3r.com wrote: This is really weird and I’m surprised no one has found this issue yet. I’ve spent about an hour or more trying to debug this :-( My spark install is ignoring ALL my memory settings. And of course my job is running out of memory. The default is 512MB so pretty darn small. The worker and master start up and both use 512M This alone is very weird and poor documentation IMO because: SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)” … so if it’s giving it to executors, AKA the memory executors run with, then it should be SPARK_EXECUTOR_MEMORY… … and the worker actually uses SPARK_DAEMON memory. but actually I’m right. It IS SPARK_EXECUTOR_MEMORY… according to bin/spark-class … but, that’s not actually being used :-( that setting is just flat out begin ignored and it’s just using 512MB. So all my jobs fail. … and I write an ‘echo’ so I could trace the spark-class script to see what the daemons are actually being run with and spark-class wasn’t being called with and nothing is logged for the coarse grained executor. I guess it’s just inheriting the JVM opts from it’s parent and Java is launching the process directly? This is a nightmare :( -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: Big performance difference between client and cluster deployment mode; is this expected?
Hi Tathagata, It's a standalone cluster. The submit commands are: == CLIENT spark-submit --class com.fake.Test \ --deploy-mode client --master spark://fake.com:7077 \ fake.jar arguments == CLUSTER spark-submit --class com.fake.Test \ --deploy-mode cluster --master spark://fake.com:7077 \ s3n://fake.jar arguments And they are both occupying all available slots. (8 * 2 machine = 16 slots). ᐧ On Thu, Jan 1, 2015 at 12:21 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Whats your spark-submit commands in both cases? Is it Spark Standalone or YARN (both support client and cluster)? Accordingly what is the number of executors/cores requested? TD On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote: Also the job was deployed from the master machine in the cluster. On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote: Oh sorry that was a edit mistake. The code is essentially: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I.e. there is essentially no original code (I was calling saveAsTextFile in a save function but that was just a remnant from previous debugging). On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote: -dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ