Re: Kyro deserialisation error
Hi, We've got the same problem here (randomly happens) : Unable to find class: 6 4 Ú4Ú» 8 4î4Úº*Q|T4â` j4 Ǥ4ê´g8 4 ¾4Ú» 4 4Ú» pE4ʽ4ں*WsѴμˁ4ڻ4ʤ4ցbל4ڻ 4[͝4[ۦ44ڻ!~44ڻΡ4Ƈ4Pҍ4҇%Q4ɋ4ifj4w4Y4ڻ*¸4☮R4ҲR4X4ڻ 4]5ᴁX^34l[?s4ƾ4ڻ8BH4Z4@4jჴ? 4ڻ 7B4ٛ/v4ꃂE4뿁4J04릁4%44ؕ w\44 Ӓ¯ٕ4ڻ/lv4ⴁ40喴Ƴ䂁4¸C4P4ڻ _o4lbʂԛ4각 4^x4ڻ Clearly a stream corruption problem. We've been running fine (afaik) on 1.0.0 for two weeks, switch to 1.0.1 this Monday, and since, this kind of problem randomly occur. Guillaume Pitel Not sure if this helps, but it does seem to be part of a name in a Wikipedia article, and Wikipedia is the data set. So something is reading this class name from the data. http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea? -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
spark streaming actor receiver doesn't play well with kryoserializer
it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen. I modified the ActorWordCount example program from val sparkConf = new SparkConf().setAppName(ActorWordCount) to val sparkConf = new SparkConf() .setAppName(ActorWordCount) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer”) and I get the stack trace below. I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry. I also added a default empty constructor to SampleActorReceiver just for kicks class SerializationRegistry extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[SampleActorReceiver]) } } … case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String) extends Actor with ActorHelper { def this() = this(“”) ... } ... val sparkConf = new SparkConf() .setAppName(ActorWordCount) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, org.apache.spark.examples.streaming.SerializationRegistry) None of this worked, same stack trace. Any idea what’s going on? Is this a known issue and is there a workaround? 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:218) at akka.actor.ActorCell.create(ActorCell.scala:578) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723) at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296) at akka.actor.dungeon.Children$class.makeChild(Children.scala:191) at akka.actor.dungeon.Children$class.actorOf(Children.scala:38) at akka.actor.ActorCell.actorOf(ActorCell.scala:338) at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.init(ActorReceiver.scala:152) at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401) at akka.actor.Props.newActor(Props.scala:339) at akka.actor.ActorCell.newActor(ActorCell.scala:534) at akka.actor.ActorCell.create(ActorCell.scala:560) ... 9 more Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at akka.util.Reflect$.instantiate(Reflect.scala:69) at akka.actor.Props.cachedActorClass(Props.scala:203) at akka.actor.Props.actorClass(Props.scala:327) at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124) at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718) ... 20 more Caused by: java.lang.IllegalArgumentException: wrong number of arguments at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at akka.util.Reflect$.instantiate(Reflect.scala:65) ... 24 more
Re: Help in merging a RDD agaisnt itself using the V of a (K,V).
Yeah reduce() will leave you with one big collection of sets on the driver. Maybe the set of all identifiers isn't so big -- a hundred million Longs even isn't so much. I'm glad to hear cartesian works but can that scale? you're making an RDD of N^2 elements initially which is just vast. On Thu, Jul 24, 2014 at 2:09 AM, Roch Denis rde...@exostatic.com wrote: Ah yes, you're quite right with partitions I could probably process a good chunk of the data but I didn't think a reduce would work? Sorry, I'm still new to Spark and map reduce in general but I thought that the reduce result wasn't an RDD and had to fit into memory. If the result of a reduce can be any size, then yes I can see how to make it work. Sorry for not being certain, the doc is not quite clear on that point, at least to me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530p10556.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Function setup and cleanup
Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks
Re: streaming sequence files?
Can you just call fileStream or textFileStream in the second app, to consume files that appear in HDFS / Tachyon from the first job? On Thu, Jul 24, 2014 at 2:43 AM, Barnaby bfa...@outlook.com wrote: If I save an RDD as a sequence file such as: val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD( d = { d.saveAsSequenceFile(tachyon://localhost:19998/files/WordCounts- + (new SimpleDateFormat(MMdd-HHmmss) format Calendar.getInstance.getTime).toString) }) How can I use these results in another Spark app since there is no StreamingContext.sequenceFileStream()? Or, What is the best way to save RDDs of objects to files in one streaming app so that another app can stream those files in? Basically, reuse partially reduced RDDs for further processing so that it doesn't have to be done more than once. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Starting with spark
Hello All, I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware* for execute sample examples of Spark. I am very sorry for silly and basic question. I am not able to deploy and execute sample examples of spark. please suggest me *how to start with spark*. Please help me Thanks in advance. Regards, Sam
save to HDFS
Hi, I have a scala application which I have launched into a spark cluster. I have the following statement trying to save to a folder in the master: saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](hdfs://masteripaddress:9000/root/test-app/test1/) The application is executed successfully and log says that save is complete also. But I am not able to find the file I have saved anywhere. Is there a way I can access this file? Pls advice. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: save to HDFS
Are you sure the RDD that you were saving isn't empty!? Are you seeing a _SUCCESS file in this location? hdfs:// masteripaddress:9000/root/test-app/test1/ (Do hadoop fs -ls hdfs://masteripaddress:9000/root/test-app/test1/) Thanks Best Regards On Thu, Jul 24, 2014 at 4:24 PM, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I have a scala application which I have launched into a spark cluster. I have the following statement trying to save to a folder in the master: saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](hdfs://masteripaddress:9000/root/test-app/test1/) The application is executed successfully and log says that save is complete also. But I am not able to find the file I have saved anywhere. Is there a way I can access this file? Pls advice. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Starting with spark
Here's the complete overview http://spark.apache.org/docs/latest/ And Here's the quick start guidelines http://spark.apache.org/docs/latest/quick-start.html I would suggest you downloading the Spark pre-compiled binaries http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.1/spark-1.0.1-bin-hadoop1.tgz and start off yourself. Thanks Best Regards On Thu, Jul 24, 2014 at 4:23 PM, Sameer Sayyed sam.sayyed...@gmail.com wrote: Hello All, I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware* for execute sample examples of Spark. I am very sorry for silly and basic question. I am not able to deploy and execute sample examples of spark. please suggest me *how to start with spark*. Please help me Thanks in advance. Regards, Sam
Re: save to HDFS
Hi Akhil, I am sure that the RDD that I saved is not empty. I have tested it using take. But is there no way that I can see this saved physically like we do in the normal context? Can't I view this folder as I am already logged into the cluster? And, should I run hadoop fs -ls hdfs://masteripaddress:9000/root/test-app/test1/ after I login to the cluster? Thanks, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: save to HDFS
This piece of code saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](hdfs:// masteripaddress:9000/root/test-app/test1/) Saves the RDD into HDFS, and yes you can physically see the files using the hadoop command (hadoop fs -ls /root/test-app/test1 - yes you need to login to the cluster). In case if you are not able to execute the command (like hadoop command not found), you can do like $HADOOP_HOME/bin/hadoop fs -ls /root/test-app/test1 Thanks Best Regards On Thu, Jul 24, 2014 at 4:34 PM, lmk lakshmi.muralikrish...@gmail.com wrote: Hi Akhil, I am sure that the RDD that I saved is not empty. I have tested it using take. But is there no way that I can see this saved physically like we do in the normal context? Can't I view this folder as I am already logged into the cluster? And, should I run hadoop fs -ls hdfs://masteripaddress:9000/root/test-app/test1/ after I login to the cluster? Thanks, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: save to HDFS
Thanks Akhil. I was able to view the files. Actually I was trying to list the same using regular ls and since it did not show anything I was concerned. Thanks for showing me the right direction. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10583.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Function setup and cleanup
If you want to connect to DB in program, you can use JdbcRDD ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ) 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks
Re: new error for me
I am currently facing the same problem. error snapshot as below: 14-07-24 19:15:30 WARN [pool-3-thread-1] SendingConnection: Error finishing connection to r64b22034.tt.net/10.148.129.84:47525 java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) 14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Handling connection error on connection to ConnectionManagerId(r64b22034.tt.net,47525) 14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Removing SendingConnection to ConnectionManagerId(r64b22034.tt.net,47525) 14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Notifying org.apache.spark.network.ConnectionManager$MessageStatus@1704ebb could anyone help shed a light on this? thanks On Tue, Jul 22, 2014 at 11:35 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Does anyone know what this error means: 14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on executor 27: r104u05.oculus.local (PROCESS_LOCAL) 14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes in 1 ms 14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory on r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB) 14/07/21 23:07:36 INFO SendingConnection: Initiating connection to [r104u05.oculus.local/192.168.0.105:50795] 14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@1d86a150 java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115) 14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to r104u05.oculus.local/192.168.0.105:50795 java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Notifying org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0) 14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host r104u05.oculus.local I've never seen this one before, and now it's coming up consistently. Thanks, -Nathan
Re: Configuring Spark Memory
Thank you Nishkam, I have read your code. So, for the sake of my understanding, it seems that for each spark context there is one executor per node? Can anyone confirm this? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote: See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Spark Function setup and cleanup
In my case I want to reach HBase. For every record with userId I want to get some extra information about the user and add it to result record for further prcessing On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com wrote: If you want to connect to DB in program, you can use JdbcRDD ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ) 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks
GraphX for pyspark?
I understand that GraphX is not yet available for pyspark. I was wondering if the Spark team has set a target release and timeframe for doing that work? Thank you, Eric
Re: Starting with spark
First thing... Go into the Cloudera Manager and make sure that the Spark service (master?) is started. Marco On Thu, Jul 24, 2014 at 7:53 AM, Sameer Sayyed sam.sayyed...@gmail.com wrote: Hello All, I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware* for execute sample examples of Spark. I am very sorry for silly and basic question. I am not able to deploy and execute sample examples of spark. please suggest me *how to start with spark*. Please help me Thanks in advance. Regards, Sam
Spark got stuck with a loop
Hi, I ran spark standalone mode on a cluster and it went well for approximately one hour, then the driver's output stopped with the following: 14/07/24 08:07:36 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 36 to spark@worker5.local:47416 14/07/24 08:07:36 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 36 is 265 bytes 14/07/24 08:30:04 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 39 to spark@worker5.local:47416 14/07/24 08:30:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 39 is 265 bytes Then I checked the spark UI, found only one active task, then I checked that worker's stderr, it seemed the worker had fallen into a loop: 14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally 14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally 14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally 14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 28 blocks 14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms These aberrant info was repeatedly outputted. So, what should I do to fix it? I have run the program for multiple times and sooner or later it ends up in the case. And I tried to extend the memory, didn't work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-got-stuck-with-a-loop-tp10590.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Streaming. Cannot get socketTextStream to receive anything.
You will have to define your own stream-to-iterator function and use the socketStream. The function should return custom delimited object as bytes are continuously coming in. When data is insufficient, the function should block. TD On Jul 23, 2014 6:52 PM, kytay kaiyang@gmail.com wrote: Hi TD You are right, I did not include \n to delimit the string flushed. That's the reason. Is there a way for me to define the delimiter? Like SOH or ETX instead of \n Regards kytay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Configuring Spark Memory
Great - thanks for the clarification Aaron. The offer stands for me to write some documentation and an example that covers this without leaving *any* room for ambiguity. -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson ilike...@gmail.com wrote: Whoops, I was mistaken in my original post last year. By default, there is one executor per node per Spark Context, as you said. spark.executor.memory is the amount of memory that the application requests for each of its executors. SPARK_WORKER_MEMORY is the amount of memory a Spark Worker is willing to allocate in executors. So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster, and spark.executor.memory to 4g, you would be able to run 2 simultaneous Spark Contexts who get 4g per node. Similarly, if spark.executor.memory were 8g, you could only run 1 Spark Context at a time on the cluster, but it would get all the cluster's memory. On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com wrote: Thank you Nishkam, I have read your code. So, for the sake of my understanding, it seems that for each spark context there is one executor per node? Can anyone confirm this? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote: See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of
Re: Configuring Spark Memory
Whoops, I was mistaken in my original post last year. By default, there is one executor per node per Spark Context, as you said. spark.executor.memory is the amount of memory that the application requests for each of its executors. SPARK_WORKER_MEMORY is the amount of memory a Spark Worker is willing to allocate in executors. So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster, and spark.executor.memory to 4g, you would be able to run 2 simultaneous Spark Contexts who get 4g per node. Similarly, if spark.executor.memory were 8g, you could only run 1 Spark Context at a time on the cluster, but it would get all the cluster's memory. On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com wrote: Thank you Nishkam, I have read your code. So, for the sake of my understanding, it seems that for each spark context there is one executor per node? Can anyone confirm this? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote: See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Starting with spark
Hi Sameer, I think it is much easier to start using Spark in standalone mode on a single machine. Last time I tried cloudera manager to deploy spark, it wasn't very straight forward and I hit couple of obstacles along the way. However, standalone mode is very easy to start exploring spark. Best Regards, Jerry Sent from my iPad On Jul 24, 2014, at 6:53 AM, Sameer Sayyed sam.sayyed...@gmail.com wrote: Hello All, I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for execute sample examples of Spark. I am very sorry for silly and basic question. I am not able to deploy and execute sample examples of spark. please suggest me how to start with spark. Please help me Thanks in advance. Regards, Sam
Re: akka 2.3.x?
We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray) directly in addition to Spark. Yardena -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Starting with spark
Hi Sam, I tried Spark on Cloudera a couple month age, any there were a lot of issues… Fortunately, I was able to switch to Hortonworks and exerting works perfect. In general, you can try two mode: standalone and via YARN. Personally, I found using Spark via YARN more comfortable special for administrating. You can era about my experience w/ standalone mode: http://simpletoad.blogspot.com/2014/04/spark-on-hdp2.html On Jul 24, 2014, at 8:12 PM, Jerry chiling...@gmail.com wrote: Hi Sameer, I think it is much easier to start using Spark in standalone mode on a single machine. Last time I tried cloudera manager to deploy spark, it wasn't very straight forward and I hit couple of obstacles along the way. However, standalone mode is very easy to start exploring spark. Best Regards, Jerry Sent from my iPad On Jul 24, 2014, at 6:53 AM, Sameer Sayyed sam.sayyed...@gmail.com wrote: Hello All, I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for execute sample examples of Spark. I am very sorry for silly and basic question. I am not able to deploy and execute sample examples of spark. please suggest me how to start with spark. Please help me Thanks in advance. Regards, Sam
rdd.saveAsTextFile blows up
I'm trying to run a simple pipeline using PySpark, version 1.0.1 I've created an RDD over a parquetFile and am mapping the contents with a transformer function and now wish to write the data out to HDFS. All of the executors fail with the same stack trace (below) I do get a directory on HDFS, but it's empty except for a file named _temporary. Any ideas? java.io.IOException (java.io.IOException: Filesystem closed} org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629) org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735) org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793) java.io.DataInputStream.readFully(DataInputStream.java:195) java.io.DataInputStream.readFully(DataInputStream.java:169) parquet.hadoop.ParquetFileReader$Chunk.init(ParquetFileReader.java:369) parquet.hadoop.ParquetFileReader$Chunk.init(ParquetFileReader.java:362) parquet.hadoop.ParquetFileReader.readColumnChunkPages(ParquetFileReader.java:411) parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349) parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966) scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:293) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
Re: akka 2.3.x?
This is being tracked here: https://issues.apache.org/jira/browse/SPARK-1812, since it will also be needed for cross-building with Scala 2.11. Maybe we can do it before that. Probably too late for 1.1, but you should open an issue for 1.2. In that JIRA I linked, there's a pull request from a month ago that adds Akka 2.3, so that's worth looking at. Matei On Jul 24, 2014, at 10:19 AM, yardena ymeym...@gmail.com wrote: We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray) directly in addition to Spark. Yardena -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
GraphX canonical conflation issues
Hi there, This issue has been mentioned in: http://apache-spark-user-list.1001560.n3.nabble.com/Java-IO-Stream-Corrupted-Invalid-Type-AC-td6925.html However I'm starting a new thread since the issue is distinct from the above topic's designated subject. I'm test-running canonical conflation on a ~100 MB graph (with hopes to scale to 10 GB or more). I'm deploying on 5 r3.xlarge machines on AWS EMR and using default configurations, with the exception of setting spark.serializer as org.apache.spark.serializer.KryoSerializer. The full stack-trace from canonical conflation is pasted below; it evidently fails at: Failed to run reduce at VertexRDD.scala:100. (The same app ran just fine on very small input locally.) Has there been any progress in identifying the underlying issues? Thanks! 14/07/24 16:29:37 INFO mapred.FileInputFormat: Total input paths to process : 1 * About to run connected components * 14/07/24 16:29:37 INFO spark.SparkContext: Starting job: reduce at VertexRDD.scala:100 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 5 (mapPartitions at VertexRDD.scala:423) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 18 (mapPartitions at VertexRDD.scala:318) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 22 (mapPartitions at VertexRDD.scala:318) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 26 (mapPartitions at GraphImpl.scala:184) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Got job 0 (reduce at VertexRDD.scala:100) with 1 output partitions (allowLocal=false) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at VertexRDD.scala:100) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1, Stage 2) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1, Stage 2) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423), which has no missing parents 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423) 14/07/24 16:29:37 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-5-147-209.ec2.internal:60530/user/Executor#-2098248966] with ID 0 14/07/24 16:29:39 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: ip-10-5-147-209.ec2.internal (PROCESS_LOCAL) 14/07/24 16:29:39 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2300 bytes in 3 ms 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-167-166-70.ec2.internal:53470/user/Executor#-1954387250] with ID 3 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-169-50-78.ec2.internal:37584/user/Executor#-247338355] with ID 2 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-95-161-133.ec2.internal:55718/user/Executor#-2120787048] with ID 1 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-167-166-70.ec2.internal:52351 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-5-147-209.ec2.internal:34712 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-169-50-78.ec2.internal:35244 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-95-161-133.ec2.internal:44976 with 294.9 MB RAM 14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor 0 disconnected, so removing it 14/07/24 16:30:09 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/0 is now EXITED (Command exited with code 52) 14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140724162937-0004/0 removed: Command exited with code 52 14/07/24 16:30:09 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on ip-10-5-147-209.ec2.internal: remote Akka client disassociated 14/07/24 16:30:09 INFO scheduler.TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0 14/07/24 16:30:10 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0) 14/07/24 16:30:10 INFO client.AppClient$ClientActor: Executor added: app-20140724162937-0004/4 on worker-20140724151003-ip-10-5-147-209.ec2.internal-55958 (ip-10-5-147-209.ec2.internal:55958) with 4 cores 14/07/24 16:30:10 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 1 on executor 1: ip-10-95-161-133.ec2.internal (PROCESS_LOCAL) 14/07/24 16:30:10 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140724162937-0004/4 on hostPort
continuing processing when errors occur
Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying exception, but I'd still like to get a handle on error handling for future situations that I haven't yet encountered. My goal is this: Retry the problem record only (rather than starting over at the beginning of the partition) up to N times, then give up and move on to process the rest of the partition. As far as I can tell, I need to supply my own retry behavior and if I want to process records after the problem record I have to swallow exceptions inside the foreach block. My 2 questions are: 1. Is there anything I can do to prevent the executor from being shut down when a failure occurs? 2. Are there ways Spark can help me get closer to my goal of retrying only the problem record without writing my own re-try code and swallowing exceptions? Regards, Art
Getting the number of slaves
Hi, Is there a way to get the number of slaves/workers during runtime? I searched online but didn't find anything :/ The application I'm working will run on different clusters corresponding to different deployment stages (beta - prod). It would be great to get the number of slaves currently in use, in order set the level of parallelism and RDD partitions, based on that number. Thanks! Nicolas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Simple record matching using Spark SQL
Hi Sarath, I will try to reproduce the problem. Thanks, Yin On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Sorry for the delayed response. I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark programs on a standalone spark cluster using 2 nodes. One node works as both master and worker while other node is just a worker. I quite didn't get when you asked for jstack of the driver and executor. So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and stderr files for this job in $SPARK_HOME/work folder from both the nodes. Also attaching the program which I executed. If I uncomment the lines 36 37 the program works fine, otherwise it just keeps running forever. ~Sarath. On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com wrote: What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta
Spark Training at Scala By the Bay with Databricks, Fast Tracl to Scala
Scala By the Bay (www.scalabythebay.org) is happy to confirm that our Spark training on August 11-12 will be run by Databricks and By the Bay together. It will be focused on Scala, and is the first Spark Training at a major Scala venue. Spark is written in Scala, with the unified data pipeline and interactive REPL and streaming all taking advantage of powerful Scala DSL capabilities. Type safety ensures that your program will not have run-time errors due to typos, and JVM performance wins for machine learning and compute-intensive applications. Type inference keeps code clear and concise in Scala while keeping all strong typing guarantees, and immutability enables concurrency on multicores as well as across the cluster. If you're serious about performance and enterprise maintainability, you should use Spark with Scala. We're offering Typesafe-certified Fast Track to Scala (FTTS) course at Scala By the Bay. To celebrate our becoming a Typesafe Certified Training Partner, we're happy to extend to Spark users a $300 discount for the FTTS -- use code SPARKSCALABYTHEBAY. FTTS is taught on August 6-7 by Brendan W. McAdams. Brendan is an Akka contributor and the author of MongoDB Scala drivers, including reactive ones, and one of the best Scala teachers in the world. His notes on the course are here: http://www.scalabythebay.org/files/scala_training_deck.pdf. This is the best intensive ramp-up Scala course you could possibly take. If your company is choosing Scala, there's no better way to get up to speed on the whole Scala ecosystem by attending the training and then the conference (and possibly Spark training afterwards). As you dive deeper into Spark, you will want to use it in Scala. Our trainings are fairly intimate affairs, limited to 25 people each. It means you will get individual attention for your skill level. Please share this course information with your colleagues who are planning to learn Scala and Spark. Full training info: http://www.scalabythebay.org/training.html#training. Group discounts are available on 5 or more registrations, contactregistrat...@scalabythebay.org for more information about those or if you have any other questions. Cheers, A+ (Alexy) and Scala By the Bay
Re: Getting the number of slaves
Try sc.getExecutorStorageStatus().length SparkContext's getExecutorMemoryStatus or getExecutorStorageStatus will give you back an object per executor - the StorageStatus objects are what drives a lot of the Spark Web UI. https://spark.apache.org/docs/1.0.1/api/scala/index.html#org.apache.spark.SparkContext On Thu, Jul 24, 2014 at 11:16 AM, Nicolas Mai nicolas@gmail.com wrote: Hi, Is there a way to get the number of slaves/workers during runtime? I searched online but didn't find anything :/ The application I'm working will run on different clusters corresponding to different deployment stages (beta - prod). It would be great to get the number of slaves currently in use, in order set the level of parallelism and RDD partitions, based on that number. Thanks! Nicolas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Emacs Setup Anyone?
Anyone out there have a good configuration for emacs? Scala-mode sort of works, but I¹d love to see a fully-supported spark-mode with an inferior shell. Searching didn¹t turn up much of anything. Any emacs users out there? What setup are you using? Cheers, - SteveN -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Kmeans: set initial centers explicitly
Hi, The mllib.clustering.kmeans implementation supports a random or parallel initialization mode to pick the initial centers. is there a way to specify the initial centers explictly? It would be useful to have a setCenters() method where we can explicitly specify the initial centers. (For e.g. R allows us to specify the initial centers.) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: continuing processing when errors occur
Hi Art, I have some advice that isn't spark-specific at all, so it doesn't *exactly* address your questions, but you might still find helpful. I think using an implicit to add your retyring behavior might be useful. I can think of two options: 1. enriching RDD itself, eg. to add a .retryForeach, which would have the desired behavior. 2. enriching Function to create a variant with retry behavior. I prefer option 2, because it could be useful outside of spark, and even within spark, you might realize you want to do something similar for more than just foreach. Here's an example. (probably there is a more functional way to do this, to avoid the while loop, but my brain isn't working and that's not the point of this anyway) Lets say we have this function: def tenDiv(x:Int) = println(10 / x) and we try applying it to a normal old Range: scala (-10 to 10).foreach{tenDiv} -1 -1 -1 -1 -1 -2 -2 -3 -5 -10 java.lang.ArithmeticException: / by zero at .tenDiv(console:7) We can create enrich Function to add some retry behavior: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) } catch { case scala.util.control.NonFatal(ex) = println(sfailed on try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } We activate this behavior by calling .retryable(nTries) on our method. Like so: scala (-2 to 2).foreach{(tenDiv _).retryable(1)} -5 -10 failed on try 1 with java.lang.ArithmeticException: / by zero 10 5 scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -5 -5 -5 -10 -10 -10 failed on try 1 with java.lang.ArithmeticException: / by zero failed on try 2 with java.lang.ArithmeticException: / by zero failed on try 3 with java.lang.ArithmeticException: / by zero 10 10 10 5 5 5 You could do the same thing on closures you pass to RDD.foreach. I should add, that I'm often very hesitant to use implicits because in can make it harder to follow what's going on in the code. I think this version is OK, though, b/c somebody coming along later and looking at the code at least can see the call to retryable as a clue. (I really dislike implicit conversions that happen without any hints in the actual code.) Hopefully that's enough of a hint for others to figure out what is going on. Eg., intellij will know where that method came from and jump to it, and also if you make the name unique enough, you can probably find it with plain text search / c-tags. But, its definitely worth considering for yourself. hope this helps, Imran On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote: Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying exception, but I'd still like to get a handle on error handling for future situations that I haven't yet encountered. My goal is this: Retry the problem record only (rather than starting over at the beginning of the partition) up to N times, then give up and move on to process the rest of the partition. As far as I can tell, I need to supply my own retry behavior and if I want to process records after the problem record I have to swallow exceptions inside the foreach block. My 2 questions are: 1. Is there anything I can do to prevent the executor from being shut down when a failure occurs? 2. Are there ways Spark can help me get closer to my goal of retrying only the problem record without writing my own re-try code and swallowing exceptions? Regards, Art
Re: Configuring Spark Memory
SO this is good information for standalone, but how is memory distributed within Mesos? There's coarse grain mode where the execute stays active, or theres fine grained mode where it appears each task is it's only process in mesos, how to memory allocations work in these cases? Thanks! On Thu, Jul 24, 2014 at 12:14 PM, Martin Goodson mar...@skimlinks.com wrote: Great - thanks for the clarification Aaron. The offer stands for me to write some documentation and an example that covers this without leaving *any* room for ambiguity. -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson ilike...@gmail.com wrote: Whoops, I was mistaken in my original post last year. By default, there is one executor per node per Spark Context, as you said. spark.executor.memory is the amount of memory that the application requests for each of its executors. SPARK_WORKER_MEMORY is the amount of memory a Spark Worker is willing to allocate in executors. So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster, and spark.executor.memory to 4g, you would be able to run 2 simultaneous Spark Contexts who get 4g per node. Similarly, if spark.executor.memory were 8g, you could only run 1 Spark Context at a time on the cluster, but it would get all the cluster's memory. On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com wrote: Thank you Nishkam, I have read your code. So, for the sake of my understanding, it seems that for each spark context there is one executor per node? Can anyone confirm this? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote: See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores:
Re: continuing processing when errors occur
whoops! just realized I was retyring the function even on success. didn't pay enough attention to the output from my calls. Slightly updated definitions: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) success = true } catch { case scala.util.control.NonFatal(ex) = println(sfailed on input $a, try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } def tenDiv(x:Int) = println(x + --- + (10 / x)) and example usage: scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -2 --- -5 -1 --- -10 failed on input 0, try 1 with java.lang.ArithmeticException: / by zero failed on input 0, try 2 with java.lang.ArithmeticException: / by zero failed on input 0, try 3 with java.lang.ArithmeticException: / by zero 1 --- 10 2 --- 5 On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote: Hi Art, I have some advice that isn't spark-specific at all, so it doesn't *exactly* address your questions, but you might still find helpful. I think using an implicit to add your retyring behavior might be useful. I can think of two options: 1. enriching RDD itself, eg. to add a .retryForeach, which would have the desired behavior. 2. enriching Function to create a variant with retry behavior. I prefer option 2, because it could be useful outside of spark, and even within spark, you might realize you want to do something similar for more than just foreach. Here's an example. (probably there is a more functional way to do this, to avoid the while loop, but my brain isn't working and that's not the point of this anyway) Lets say we have this function: def tenDiv(x:Int) = println(10 / x) and we try applying it to a normal old Range: scala (-10 to 10).foreach{tenDiv} -1 -1 -1 -1 -1 -2 -2 -3 -5 -10 java.lang.ArithmeticException: / by zero at .tenDiv(console:7) We can create enrich Function to add some retry behavior: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) } catch { case scala.util.control.NonFatal(ex) = println(sfailed on try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } We activate this behavior by calling .retryable(nTries) on our method. Like so: scala (-2 to 2).foreach{(tenDiv _).retryable(1)} -5 -10 failed on try 1 with java.lang.ArithmeticException: / by zero 10 5 scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -5 -5 -5 -10 -10 -10 failed on try 1 with java.lang.ArithmeticException: / by zero failed on try 2 with java.lang.ArithmeticException: / by zero failed on try 3 with java.lang.ArithmeticException: / by zero 10 10 10 5 5 5 You could do the same thing on closures you pass to RDD.foreach. I should add, that I'm often very hesitant to use implicits because in can make it harder to follow what's going on in the code. I think this version is OK, though, b/c somebody coming along later and looking at the code at least can see the call to retryable as a clue. (I really dislike implicit conversions that happen without any hints in the actual code.) Hopefully that's enough of a hint for others to figure out what is going on. Eg., intellij will know where that method came from and jump to it, and also if you make the name unique enough, you can probably find it with plain text search / c-tags. But, its definitely worth considering for yourself. hope this helps, Imran On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote: Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying
KMeans: expensiveness of large vectors
As a source, I have a textfile with n rows that each contain m comma-separated integers. Each row is then converted into a feature vector with m features each. I've noticed, that given the same total filesize and number of features, a larger number of columns is much more expensive for training a KMeans model than a large number of rows. To give an example: 10k rows X 1k columns took 21 seconds on my cluster, whereas 1k rows X 10k colums took 1min47s. Both files had a size of 238M. Can someone explain what in the implementation of KMeans causes large vectors to be so much more expensive than having many of these vectors? A pointer to the exact part of the source would be fantastic, but even a general explanation would help me. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
cache changes precision
Hi, I'm doing the following: def main(args: Array[String]) = { val sparkConf = new SparkConf().setAppName(AvroTest).setMaster(local[2]) val sc = new SparkContext(sparkConf) val conf = new Configuration() val job = new Job(conf) val path = new Path(/tmp/a.avro); val schema = AvroUtils.getSchema(conf, path); AvroJob.setInputKeySchema(job, schema); val rdd = sc.newAPIHadoopFile( path.toString(), classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(x = x._1.datum()) val sum = rdd.map(p = p.get(SEPAL_WIDTH).asInstanceOf[Float]).reduce(_ + _) val avg = sum/rdd.count() println(sSum = $sum) println(sAvg = $avg) } If I run this, it works as expected, when I add .cache() to val rdd = sc.newAPIHadoopFile( path.toString(), classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(x = x._1.datum()).cache() then the command rounds up the average. Any idea why this works this way? Any tips on how to fix this? Thanks, Ron
mapToPair vs flatMapToPair vs flatMap function usage.
Can any one help me understand the key difference between mapToPair vs flatMapToPair vs flatMap functions and also when to apply these functions in particular. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
You can set the Java option -Dsun.io.serialization.extendedDebugInfo=true to have more information about the object be printed. It will help you trace down the how the SparkContext is getting included in some kind of closure. TD On Thu, Jul 24, 2014 at 9:48 AM, lihu lihu...@gmail.com wrote: Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920 %2B86%2015120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Getting the number of slaves
Thanks, this is what I needed :) I should have searched more... Something I noticed though: after the SparkContext is initialized, I had to wait for a few seconds until sc.getExecutorStorageStatus.length returns the correct number of workers in my cluster (otherwise it returns 1, for the driver)... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming sequence files?
I have the streaming program writing sequence files. I can find one of the files and load it in the shell using: scala val rdd = sc.sequenceFile[String, Int](tachyon://localhost:19998/files/WordCounts/20140724-213930) 14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=309225062 14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.1 KB, free 294.9 MB) rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile at console:12 So I got some type information, seems good. It took a while to research but I got the following streaming code to compile and run: val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String, Int]](args(0)) It works now and I offer this for reference to anybody else who may be curious about saving sequence files and then streaming them back in. Question: When running both streaming programs at the same time using spark-submit I noticed that only one app would really run. To get the one app to continue I had to stop the other app. Is there a way to get these running simultaneously? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NotSerializableException in Spark Streaming
Yep, here goes! Here are my environment vitals: - Spark 1.0.0 - EC2 cluster with 1 slave spun up using spark-ec2 - twitter4j 3.0.3 - spark-shell called with --jars argument to load spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j jars. Now, while I’m in the Spark shell, I enter the following: import twitter4j.auth.{Authorization, OAuthAuthorization} import twitter4j.conf.ConfigurationBuilder import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} import org.apache.spark.streaming.twitter.TwitterUtils def getAuth(): Option[Authorization] = { System.setProperty(twitter4j.oauth.consumerKey, consumerKey) System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret) System.setProperty(twitter4j.oauth.accessToken, accessToken) System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret) Some(new OAuthAuthorization(new ConfigurationBuilder().build())) } def noop(a: Any): Any = { a } val ssc = new StreamingContext(sc, Seconds(5)) val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth()) val liveTweets = liveTweetObjects.map(_.getText) liveTweets.map(t = noop(t)).print() ssc.start() So basically, I’m just printing Tweets as-is, but first I’m mapping them to themselves via noop(). The Tweets will start to flow just fine for a minute or so, and then, this: 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 140624361 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The time-to-first-error is variable. This is the simplest repro I can show at this time. Doing more complex things with liveTweets that involve a KMeansModel, for example, will be interrupted quicker by this java.io.NotSerializableException. I don’t know if the root cause is the same, but the error certainly is. By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error, but I can’t dig deeper to make sure this is really resolved (e.g. by trying more complex things that need data) due to SPARK-2471 https://issues.apache.org/jira/browse/SPARK-2471. I see that that issue has been resolved, so I’ll try this whole process again using the latest from master and see how it goes. Nick On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am very curious though. Can you post a concise code example which we can run to reproduce this problem? TD On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am not entire sure off the top of my head. But a possible (usually works) workaround is to define the function as a val instead of a def. For example def func(i: Int): Boolean = { true } can be written as val func = (i: Int) = { true } Hope this helps for now. TD On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m
Re: Simple record matching using Spark SQL
Hi Sarath, Have you tried the current branch 1.0? If not, can you give it a try and see if the problem can be resolved? Thanks, Yin On Thu, Jul 24, 2014 at 11:17 AM, Yin Huai yh...@databricks.com wrote: Hi Sarath, I will try to reproduce the problem. Thanks, Yin On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Sorry for the delayed response. I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark programs on a standalone spark cluster using 2 nodes. One node works as both master and worker while other node is just a worker. I quite didn't get when you asked for jstack of the driver and executor. So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and stderr files for this job in $SPARK_HOME/work folder from both the nodes. Also attaching the program which I executed. If I uncomment the lines 36 37 the program works fine, otherwise it just keeps running forever. ~Sarath. On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com wrote: What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people
Re: mapToPair vs flatMapToPair vs flatMap function usage.
The Pair ones return a JavaPairRDD, which has additional operations on key-value pairs. Take a look at http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs for details. Matei On Jul 24, 2014, at 3:41 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: Can any one help me understand the key difference between mapToPair vs flatMapToPair vs flatMap functions and also when to apply these functions in particular. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Function setup and cleanup
You can refer this topic http://www.mapr.com/developercentral/code/loading-hbase-tables-spark 2014-07-24 22:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: In my case I want to reach HBase. For every record with userId I want to get some extra information about the user and add it to result record for further prcessing On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com wrote: If you want to connect to DB in program, you can use JdbcRDD ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ) 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks
Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode
I can successfully run my code in local mode using spark-submit (--master local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode. Any hints what is the problem? Is it a closure serialization problem? How can I debug it? Your answers would be very helpful. 14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to java.lang.ExceptionInInitializerError java.lang.ExceptionInInitializerError at com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal a:40) at com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal a:36) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: GraphX Pragel implementation
On Thu, Jul 24, 2014 at 9:52 AM, Arun Kumar toga...@gmail.com wrote: While using pregel API for Iterations how to figure out which super step the iteration currently in. The Pregel API doesn't currently expose this, but it's very straightforward to modify Pregel.scala https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L112 to do so. Let me know if you'd like help doing this. Ankur http://www.ankurdave.com/