Error when I use spark-streaming
hi all, When I run spark-streaming use NetworkWordCount in example, it always throw this Exception. I don't understand why it can't connect and I don't restrict port. 14/04/11 15:38:56 ERROR SocketReceiver: Error receiving data in receiver 0 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:57) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:147) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$9.apply(NetworkInputTracker.scala:201) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$9.apply(NetworkInputTracker.scala:197) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1042) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1042) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/04/11 15:38:56 ERROR NetworkInputTracker: Deregistered receiver for network stream 0 with message: Thanks
Re: Error when I use spark-streaming
I found it. I should run nc -lk at first and then run the NetworkWordCount. Thanks On Fri, Apr 11, 2014 at 4:13 PM, Schein, Sagi sagi.sch...@hp.com wrote: I would check the DNS setting. Akka seems to pick configuration from FQDN on my system Sagi *From:* Hahn Jiang [mailto:hahn.jiang@gmail.com] *Sent:* Friday, April 11, 2014 10:56 AM *To:* user *Subject:* Error when I use spark-streaming hi all, When I run spark-streaming use NetworkWordCount in example, it always throw this Exception. I don't understand why it can't connect and I don't restrict port. 14/04/11 15:38:56 ERROR SocketReceiver: Error receiving data in receiver 0 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:57) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:147) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$9.apply(NetworkInputTracker.scala:201) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$9.apply(NetworkInputTracker.scala:197) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1042) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1042) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/04/11 15:38:56 ERROR NetworkInputTracker: Deregistered receiver for network stream 0 with message: Thanks
Re: Behaviour of caching when dataset does not fit into memory
Hi Pierre, 1. cache() would cost time to carry stuffs from disk to memory, so pls do not use cache() if your job is not an iterative one. 2. If your dataset is larger than memory amount, then there will be a replacement strategy to exchange data between memory and disk. 2014-04-11 0:07 GMT+08:00 Pierre Borckmans pierre.borckm...@realimpactanalytics.com: Hi there, Just playing around in the Spark shell, I am now a bit confused by the performance I observe when the dataset does not fit into memory : - i load a dataset with roughly 500 million rows - i do a count, it takes about 20 seconds - now if I cache the RDD and do a count again (which will try cache the data again), it takes roughly 90 seconds (the fraction cached is only 25%). = is this expected? to be roughly 5 times slower when caching and not enough RAM is available? - the subsequent calls to count are also really slow : about 90 seconds as well. = I can see that the first 25% tasks are fast (the ones dealing with data in memory), but then it gets really slow… Am I missing something? I thought performance would decrease kind of linearly with the amour of data fit into memory… Thanks for your help! Cheers *Pierre Borckmans* *Real**Impact* Analytics *| *Brussels Office www.realimpactanalytics.com *| *pierre.borckm...@realimpactanalytics.comthierry.lib...@realimpactanalytics.com *FR *+32 485 91 87 31 *| **Skype* pierre.borckmans -- Best Regards --- Xusen Yin尹绪森 Intel Labs China Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*
[GraphX] Cast error when comparing a vertex attribute after its type has changed
Hi, Testing in mapTriplets if a vertex attribute, which is defined as Integer in first VertexRDD but has been changed after to Double by mapVertices, is greater than a number throws java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double. If second elements of vertex attributes don't contain a zero there is no error. Replace vertices: RDD[(Long, (Int, Int))] by vertices: RDD[(Long, (Int, Double))] in the code below solves the problem. I am not sure if it's a lineage gestion issue or if it's normal. I am using Spark 0.9.1. Thanks for your help, Pierre-Alexandre import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ val vertices: RDD[(Long, (Int, Integer))] = sc.parallelize(Array( (1L, (4, 0)), (2L, (0, 0)), (3L, (7, 0)) )) val edges = sc.parallelize(Array( Edge(1L, 2L, 0), Edge(2L, 3L, 2), Edge(3L, 1L, 5) )) val graph0 = Graph(vertices, edges) val graph1 = graph0.mapVertices { case (vid, (n, _)) = (n, n.toDouble/3) } val graph2 = graph1.mapTriplets(t = { if (t.srcAttr._2 0) 1 else 2 }) graph2.edges.foreach(println(_)) // ERROR ERROR Executor: Exception in task ID 7 java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) at scala.Tuple2._2$mcD$sp(Tuple2.scala:19) at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:27) at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.graphx.impl.EdgePartition.map(EdgePartition.scala:96) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$10.apply(GraphImpl.scala:148) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$10.apply(GraphImpl.scala:133) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:85) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:48) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Cast-error-when-comparing-a-vertex-attribute-after-its-type-has-changed-tp4119.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Hybrid GPU CPU computation
Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Hybrid GPU CPU computation
This is a bit crazy :) I suppose you would have to run Java code on the GPU! I heard there are some funny projects to do that... Pascal On Fri, Apr 11, 2014 at 2:38 PM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Hybrid GPU CPU computation
There is a scala implementation for gpgus (nvidia cuda to be precise). but you also need to port mesos for gpu's. I am not sure about mesos. Also, the current scala gpu version is not stable to be used commercially. Hope this helps. Thanks saurabh. *Saurabh Jha* Intl. Exchange Student School of Computing Engineering Nanyang Technological University, Singapore Web: http://profile.saurabhjha.in Mob: +65 94663172 On Fri, Apr 11, 2014 at 8:40 PM, Pascal Voitot Dev pascal.voitot@gmail.com wrote: This is a bit crazy :) I suppose you would have to run Java code on the GPU! I heard there are some funny projects to do that... Pascal On Fri, Apr 11, 2014 at 2:38 PM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Behaviour of caching when dataset does not fit into memory
Hi Xusen, I was convinced the cache() method would involve in-memory only operations and has nothing to do with disks as the underlying default cache strategy is MEMORY_ONLY. Am I missing something? 2014-04-11 11:44 GMT+02:00 尹绪森 yinxu...@gmail.com: Hi Pierre, 1. cache() would cost time to carry stuffs from disk to memory, so pls do not use cache() if your job is not an iterative one. 2. If your dataset is larger than memory amount, then there will be a replacement strategy to exchange data between memory and disk. 2014-04-11 0:07 GMT+08:00 Pierre Borckmans pierre.borckm...@realimpactanalytics.com: Hi there, Just playing around in the Spark shell, I am now a bit confused by the performance I observe when the dataset does not fit into memory : - i load a dataset with roughly 500 million rows - i do a count, it takes about 20 seconds - now if I cache the RDD and do a count again (which will try cache the data again), it takes roughly 90 seconds (the fraction cached is only 25%). = is this expected? to be roughly 5 times slower when caching and not enough RAM is available? - the subsequent calls to count are also really slow : about 90 seconds as well. = I can see that the first 25% tasks are fast (the ones dealing with data in memory), but then it gets really slow… Am I missing something? I thought performance would decrease kind of linearly with the amour of data fit into memory… Thanks for your help! Cheers *Pierre Borckmans* *Real**Impact* Analytics *| *Brussels Office www.realimpactanalytics.com *| *pierre.borckm...@realimpactanalytics.comthierry.lib...@realimpactanalytics.com *FR *+32 485 91 87 31 *| **Skype* pierre.borckmans -- Best Regards --- Xusen Yin尹绪森 Intel Labs China Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*
Re: Spark 0.9.1 PySpark ImportError
Matei, thanks. So including the PYTHONPATH in spark-env.sh seemed to work. I am faced with this issue now. I am doing a large GroupBy in pyspark and the process fails (at the driver it seems). There is not much of a stack trace here to see where the issue is happening. This process works locally. 14/04/11 12:59:11 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/04/11 12:59:11 INFO scheduler.DAGScheduler: Failed to run foreach at load/load_etl.py:150 Traceback (most recent call last): File load/load_etl.py, line 164, in module generateImplVolSeries(dirName=vodimo/data/month/, symbols=symbols, outputFilePath=vodimo/data/series/output) File load/load_etl.py, line 150, in generateImplVolSeries rdd = rdd.foreach(generateATMImplVols) File /root/spark/python/pyspark/rdd.py, line 462, in foreach self.mapPartitions(processPartition).collect() # Force evaluation File /root/spark/python/pyspark/rdd.py, line 469, in collect bytesInJava = self._jrdd.collect().iterator() File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o55.collect. : org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times (most recent failure: unknown) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/04/11 12:59:11 INFO scheduler.DAGScheduler: Executor lost: 3 (epoch 4) 14/04/11 12:59:11 INFO storage.BlockManagerMasterActor: Trying to remove executor 3 from BlockManagerMaster. 14/04/11 12:59:11 INFO storage.BlockManagerMaster: Removed 3 successfully in removeExecutor - CEO / Velos (velos.io) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-PySpark-ImportError-tp4068p4125.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark for Divide-and-Conquer Algorithms
There is a handy parallelize method for running independent computations. The examples page (http://spark.apache.org/examples.html) on the website uses it to estimate Pi. You can join the results at the end of the parallel calculations. On Fri, Apr 11, 2014 at 7:52 AM, Yanzhe Chen yanzhe...@gmail.com wrote: Hi all, Is Spark suitable for applications like Convex Hull algorithm, which has some classic divide-and-conquer approaches like QuickHull? More generally, Is there a way to express divide-and-conquer algorithms in Spark? Thanks! -- Yanzhe Chen Institute of Parallel and Distributed Systems Shanghai Jiao Tong University Email: yanzhe...@gmail.com Sent with Sparrow http://www.sparrowmailapp.com/?sig -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Too many tasks in reduceByKey() when do PageRank iteration
Hi all, I am now implementing a simple PageRank. Unlike the PageRank example in spark, I divided the matrix into blocks and the rank vector into slices. Here is my code: https://github.com/gowithqi/PageRankOnSpark/blob/master/src/PageRank/PageRank.java I supposed that the complexity of each iteration is the same. However, I found that during the first iteration the reduceByKey() (line 162) has 6 tasks and during the second iteration it has 18 tasks and third iteration 54 tasks, fourth iteration 162 tasks.. during the sixth iteration it has 1458 tasks which almost costs more than 2 hours to complete. I don't why this happened... I think every iteration costs the same time Thank you for your help. -- 张志齐 计算机科学与技术 上海交通大学
Re: Hybrid GPU CPU computation
I've thought about this idea, although I haven't tried it, but I think the right approach is to pick your granularity boundary and use Spark + JVM for large-scale parts of the algorithm, then use the gpgus API for number crunching large chunks at a time. No need to run the JVM and Spark on the GPU, which would make no sense anyway. Here's another approach: http://www.cakesolutions.net/teamblogs/2013/02/13/akka-and-cuda/ dean On Fri, Apr 11, 2014 at 7:49 AM, Saurabh Jha saurabh.jha.2...@gmail.comwrote: There is a scala implementation for gpgus (nvidia cuda to be precise). but you also need to port mesos for gpu's. I am not sure about mesos. Also, the current scala gpu version is not stable to be used commercially. Hope this helps. Thanks saurabh. *Saurabh Jha* Intl. Exchange Student School of Computing Engineering Nanyang Technological University, Singapore Web: http://profile.saurabhjha.in Mob: +65 94663172 On Fri, Apr 11, 2014 at 8:40 PM, Pascal Voitot Dev pascal.voitot@gmail.com wrote: This is a bit crazy :) I suppose you would have to run Java code on the GPU! I heard there are some funny projects to do that... Pascal On Fri, Apr 11, 2014 at 2:38 PM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Re: Hybrid GPU CPU computation
On Fri, Apr 11, 2014 at 3:34 PM, Dean Wampler deanwamp...@gmail.com wrote: I've thought about this idea, although I haven't tried it, but I think the right approach is to pick your granularity boundary and use Spark + JVM for large-scale parts of the algorithm, then use the gpgus API for number crunching large chunks at a time. No need to run the JVM and Spark on the GPU, which would make no sense anyway. I find that would be crazy to be able to run the JVM on a GPU even if it's a bit non-sense XD Anyway, you're right, the approach by delegating just some parts of the code to the GPU is interesting but it also means you have to pre-install this code on all cluster nodes... Here's another approach: http://www.cakesolutions.net/teamblogs/2013/02/13/akka-and-cuda/ dean On Fri, Apr 11, 2014 at 7:49 AM, Saurabh Jha saurabh.jha.2...@gmail.comwrote: There is a scala implementation for gpgus (nvidia cuda to be precise). but you also need to port mesos for gpu's. I am not sure about mesos. Also, the current scala gpu version is not stable to be used commercially. Hope this helps. Thanks saurabh. *Saurabh Jha* Intl. Exchange Student School of Computing Engineering Nanyang Technological University, Singapore Web: http://profile.saurabhjha.in Mob: +65 94663172 On Fri, Apr 11, 2014 at 8:40 PM, Pascal Voitot Dev pascal.voitot@gmail.com wrote: This is a bit crazy :) I suppose you would have to run Java code on the GPU! I heard there are some funny projects to do that... Pascal On Fri, Apr 11, 2014 at 2:38 PM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Re: Hybrid GPU CPU computation
I've actually done it using PySpark and python libraries which call cuda code, though I've never done it from scala directly. The only major challenge I've hit is assigning tasks to gpus on multiple gpu machines. Sent from my iPhone On Apr 11, 2014, at 8:38 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Hybrid GPU CPU computation
In fact the idea is to run some part of the code on GPU as Patrick described and extend the RDD structure so that it can also be distributed on GPU's. The following article http://www.wired.com/2013/06/andrew_ng/ describes a hybrid GPU/GPU implementation (with MPI) that outperforms a 16, 000 cores cluster. On Fri, Apr 11, 2014 at 3:53 PM, Patrick Grinaway pgrina...@gmail.comwrote: I've actually done it using PySpark and python libraries which call cuda code, though I've never done it from scala directly. The only major challenge I've hit is assigning tasks to gpus on multiple gpu machines. Sent from my iPhone On Apr 11, 2014, at 8:38 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: shuffle memory requirements
Turns out that my ulimit settings were too low. I bumped up and the job successfully completes. Here's what I have now: $ ulimit -u // for max user processes 81920 $ ulimit -n // for open files 81920 I was thrown off by the OutOfMemoryError into thinking it is Spark running out of memory in the shuffle stage. My previous settings were 1024 for both, and while that worked for shuffle on small jobs (10s of gigs), it'd choke on the large ones. It would be good to document these in the tuning / configuration section. Something like section 2.5 ulimit and nproc of https://hbase.apache.org/book.html 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:657) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-zero-bytes blocks out of 7773 blocks 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 java.io.IOException: DFSOutputStream is closed at org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Thanks, Ameet On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote: val hrdd = sc.hadoopRDD(..) val res = hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( some code to save those partitions ) I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My custom partitioner generates over 20,000 partitions, so there are 20,000 tasks reading the shuffle files. On problems with low partitions (~ 1000), the job completes successfully. On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 cores per executor and brought it down to 3, and I still get OutOfMemoryErrors at 20,000 partitions. I have spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since I am not caching any RDDs. Do those config params look reasonable for my shuffle size ? I'm not sure what to increase - shuffle.memoryFraction or the memory that the reduce tasks get. The latter I am guessing is whatever is left after giving storage.memoryFraction and shuffle.memoryFraction. Thanks, Ameet
Re: shuffle memory requirements
A typo - I mean't section 2.1.2.5 ulimit and nproc of https://hbase.apache.org/book.html Ameet On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini ameetk...@gmail.com wrote: Turns out that my ulimit settings were too low. I bumped up and the job successfully completes. Here's what I have now: $ ulimit -u // for max user processes 81920 $ ulimit -n // for open files 81920 I was thrown off by the OutOfMemoryError into thinking it is Spark running out of memory in the shuffle stage. My previous settings were 1024 for both, and while that worked for shuffle on small jobs (10s of gigs), it'd choke on the large ones. It would be good to document these in the tuning / configuration section. Something like section 2.5 ulimit and nproc of https://hbase.apache.org/book.html 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:657) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-zero-bytes blocks out of 7773 blocks 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 java.io.IOException: DFSOutputStream is closed at org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Thanks, Ameet On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote: val hrdd = sc.hadoopRDD(..) val res = hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( some code to save those partitions ) I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My custom partitioner generates over 20,000 partitions, so there are 20,000 tasks reading the shuffle files. On problems with low partitions (~ 1000), the job completes successfully. On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 cores per executor and brought it down to 3, and I still get OutOfMemoryErrors at 20,000 partitions. I have spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since I am not caching any RDDs. Do those config params look reasonable for my shuffle size ? I'm not sure what to
Re: Spark - ready for prime time?
It’s not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention “spill” in http://spark.incubator.apache.org/docs/latest/configuration.html. Matei On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matei, Where is the functionality in 0.9 to spill data within a task (separately from persist)? My apologies if this is something obvious but I don't see it in the api docs. -Suren On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task. Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com wrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated in
Re: Spark - ready for prime time?
Excellent, thanks you. On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia matei.zaha...@gmail.comwrote: It's not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention spill in http://spark.incubator.apache.org/docs/latest/configuration.html. Matei On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matei, Where is the functionality in 0.9 to spill data within a task (separately from persist)? My apologies if this is something obvious but I don't see it in the api docs. -Suren On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.comwrote: To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we're also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren't there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task. Overall, expect to see more work to both explain how things execute ( http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We're doing a lot of this based on user feedback, so that's definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.comwrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to
GraphX
Hi I was wondering if there was an implementation for Breadth First Search algorithm in graphX? Cheers, Ghufran
Spark behaviour when executor JVM crashes
Hi I am running calling a C++ library on Spark using JNI. Occasionally the C++ library causes the JVM to crash. The task terminates on the MASTER, but the driver does not return. I am not sure why the driver does not terminate. I also notice that after such an occurrence, I lose some workers permanently. I have a few questions 1) Why does the driver not terminate? Is this because some JVMs are still in zombie or inconsistent state? 2) Can anything be done to prevent this? 3) Is there a mode in Spark where I can ignore failure and still collect results from the successful tasks? This would be a hugely useful feature as I am using Spark to run regression tests on this native library. Just collection of successful results would be of huge benefit. Deenar I see the following messages in the driver 1) Initial Errors 14/04/11 18:13:21 INFO AppClient$ClientActor: Executor updated: app-20140411180619-0011/14 is now FAILED (Command exited with code 134) 14/04/11 18:13:21 INFO SparkDeploySchedulerBackend: Executor app-20140411180619-0011/14 removed: Command exited with code 134 14/04/11 18:13:21 INFO SparkDeploySchedulerBackend: Executor 14 disconnected, so removing it 14/04/11 18:13:21 ERROR TaskSchedulerImpl: Lost executor 14 on lonpldpuappu5.uk.db.com: Unknown executor exit code (134) (died from signal 6?) 14/04/11 18:13:21 INFO TaskSetManager: Re-queueing tasks for 14 from TaskSet 3.0 14/04/11 18:13:21 WARN TaskSetManager: Lost TID 320 (task 3.0:306) 14/04/11 18:13:21 INFO AppClient$ClientActor: Executor added: app-20140411180619-0011/55 on worker-20140409143755-lonpldpuappu5.uk.db.com-58926 (lonpldpuappu5.uk.db.com:58926) with 1 cores 14/04/11 18:13:21 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140411180619-0011/55 on hostPort lonpldpuappu5.uk.db.com:58926 with 1 cores, 12.0 GB RAM 14/04/11 18:13:21 INFO AppClient$ClientActor: Executor updated: app-20140411180619-0011/55 is now RUNNING 14/04/11 18:13:21 INFO TaskSetManager: Starting task 3.0:306 as TID 352 on executor 4: lonpldpuappu5.uk.db.com (NODE_LOCAL) 2) Application stopped 14/04/11 18:13:37 ERROR AppClient$ClientActor: Master removed our application: FAILED; stopping client 14/04/11 18:13:37 WARN SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection... 14/04/11 18:13:37 INFO TaskSetManager: Starting task 3.0:386 as TID 433 on executor 58: lonpldpuappu5.uk.db.com (NODE_LOCAL) 14/04/11 18:13:37 INFO TaskSetManager: Serialized task 3.0:386 as 18244 bytes in 0 ms 14/04/11 18:13:37 INFO TaskSetManager: Starting task 3.0:409 as TID 434 on executor 39: lonpldpuappu5.uk.db.com (NODE_LOCAL) 14/04/11 18:13:37 INFO TaskSetManager: Serialized task 3.0:409 as 18244 bytes in 0 ms 14/04/11 18:13:37 WARN TaskSetManager: Lost TID 425 (task 3.0:400) 14/04/11 18:13:37 WARN TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:601) at java.io.DataInputStream.readByte(DataInputStream.java:265) at org.apache.hadoop.io.SequenceFile$Reader.sync(SequenceFile.java:2624) at org.apache.hadoop.mapred.SequenceFileRecordReader.init(SequenceFileRecordReader.java:54) at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:156) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at
Re: Setting properties in core-site.xml for Spark and Hadoop to access
Digging up this thread to ask a follow-up question: What is the intended use for /root/spark/conf/core-site.xml? It seems that both /root/spark/bin/pyspark and /root/ ephemeral-hdfs/bin/hadoop point to /root/ephemeral-hdfs/conf/core-site.xml. If I specify S3 access keys in spark/conf, Spark doesn't seem to pick them up. Nick On Fri, Mar 7, 2014 at 4:10 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Mayur, So looking at the section on environment variables herehttp://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables, are you saying to set these options via SPARK_JAVA_OPTS -D? On a related note, in looking around I just discovered this command line tool for modifying XML files called XMLStarlethttp://xmlstar.sourceforge.net/overview.php. Perhaps I should instead set these S3 keys directly in the right core-site.xml using XMLStarlet. Devs/Everyone, On a related note, I discovered that Spark (on EC2) reads Hadoop options from /root/ephemeral-hdfs/conf/core-site.xml. This is surprising given the variety of copies of core-site.xml on the EC2 cluster that gets built by spark-ec2. A quick search yields the following relevant results (snipped): find / -name core-site.xml 2 /dev/null /root/mapreduce/conf/core-site.xml /root/persistent-hdfs/conf/core-site.xml /root/ephemeral-hdfs/conf/core-site.xml /root/spark/conf/core-site.xml It looks like both pyspark and ephemeral-hdfs/bin/hadoop read configs from the ephemeral-hdfs core-site.xml file. The latter is expected; the former is not. Is this intended behavior? I expected pyspark to read configs from the spark core-site.xml file. The moment I remove my AWS credentials from the ephemeral-hdfs config file, pyspark cannot open files in S3 without me providing the credentials in-line. I also guessed that the config file under /root/mapreduce might be a kind of base config file that both Spark and Hadoop would read from first, and then override with configs from the other files. The path to the config suggests that, but it doesn't appear to be the case. Adding my AWS keys to that file seemed to affect neither Spark nor ephemeral-hdfs/bin/hadoop. Nick On Fri, Mar 7, 2014 at 2:07 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Set them as environment variable at boot configure both stacks to call on that.. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 9:32 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On spinning up a Spark cluster in EC2, I'd like to set a few configs that will allow me to access files in S3 without having to specify my AWS access and secret keys over and over, as described herehttp://stackoverflow.com/a/3033403/877069 . The properties are fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey. Is there a way to set these properties programmatically so that Spark (via the shell) and Hadoop (via distcp) are both aware of and use the values? I don't think SparkConf does what I need because I want Hadoop to also be aware of my AWS keys. When I set those properties using conf.set() in pyspark, distcp didn't appear to be aware of them. Nick -- View this message in context: Setting properties in core-site.xml for Spark and Hadoop to accesshttp://apache-spark-user-list.1001560.n3.nabble.com/Setting-properties-in-core-site-xml-for-Spark-and-Hadoop-to-access-tp2402.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
0.9 wont start cluster on ec2, SSH connection refused?
I run the follwoing command and it correctly starts one head and one master but then it fails because it can't log onto the head with the ssh key. The wierd thing is that I can log onto the head with that same public key. (ssh -i myamazonkey.pem r...@ec2-54-86-3-208.compute-1.amazonaws.com) Thanks in advance! $ spark-0.9.1-bin-hadoop2/ec2/spark-ec2 -k myamazonkey -i ~/myamazonkey.pem -s 1 launch spark-test-cluster Setting up security groups... Searching for existing cluster spark-test-cluster... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1c, regid = r-8b73b4a8 Launched master in us-east-1c, regid = r-ea76b1c9 Waiting for instances to start up... Waiting 120 more seconds... Generating cluster's SSH key on master... ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error: Failed to SSH to remote host ec2-54-86-3-208.compute-1.amazonaws.com. Please check that you have provided the correct --identity-file and --key-pair parameters and try again.
Re: 0.9 wont start cluster on ec2, SSH connection refused?
is the machine booted up reachable? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Apr 11, 2014 at 12:37 PM, Alton Alexander alexanderal...@gmail.comwrote: I run the follwoing command and it correctly starts one head and one master but then it fails because it can't log onto the head with the ssh key. The wierd thing is that I can log onto the head with that same public key. (ssh -i myamazonkey.pem r...@ec2-54-86-3-208.compute-1.amazonaws.com) Thanks in advance! $ spark-0.9.1-bin-hadoop2/ec2/spark-ec2 -k myamazonkey -i ~/myamazonkey.pem -s 1 launch spark-test-cluster Setting up security groups... Searching for existing cluster spark-test-cluster... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1c, regid = r-8b73b4a8 Launched master in us-east-1c, regid = r-ea76b1c9 Waiting for instances to start up... Waiting 120 more seconds... Generating cluster's SSH key on master... ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error: Failed to SSH to remote host ec2-54-86-3-208.compute-1.amazonaws.com. Please check that you have provided the correct --identity-file and --key-pair parameters and try again.
Re: 0.9 wont start cluster on ec2, SSH connection refused?
No not anymore but it was at the time. Thanks but I also just found a thread from two days ago discussing the root and es2-user workaround. For now I'll just go back to using the AMI provided. Thanks! On Fri, Apr 11, 2014 at 1:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: is the machine booted up reachable? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Apr 11, 2014 at 12:37 PM, Alton Alexander alexanderal...@gmail.com wrote: I run the follwoing command and it correctly starts one head and one master but then it fails because it can't log onto the head with the ssh key. The wierd thing is that I can log onto the head with that same public key. (ssh -i myamazonkey.pem r...@ec2-54-86-3-208.compute-1.amazonaws.com) Thanks in advance! $ spark-0.9.1-bin-hadoop2/ec2/spark-ec2 -k myamazonkey -i ~/myamazonkey.pem -s 1 launch spark-test-cluster Setting up security groups... Searching for existing cluster spark-test-cluster... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1c, regid = r-8b73b4a8 Launched master in us-east-1c, regid = r-ea76b1c9 Waiting for instances to start up... Waiting 120 more seconds... Generating cluster's SSH key on master... ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/ec2-user/myamazonkey.pem', '-t', '-t', u'r...@ec2-54-86-3-208.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-54-86-3-208.compute-1.amazonaws.com port 22: Connection refused Error: Failed to SSH to remote host ec2-54-86-3-208.compute-1.amazonaws.com. Please check that you have provided the correct --identity-file and --key-pair parameters and try again.
Shutdown with streaming driver running in cluster broke master web UI permanently
I had a cluster running with a streaming driver deployed into it. I shut down the cluster using sbin/stop-all.sh. Upon restarting (and restarting, and restarting), the master web UI cannot respond to requests. The cluster seems to be otherwise functional. Below is the master's log, showing stack traces. pmogren@streamproc01:~/streamproc/spark-0.9.1-bin-hadoop2$ cat /home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/sbin/../logs/spark-pmogren-org.apache.spark.deploy.master.Master-1-streamproc01.outSpark Command: /usr/lib/jvm/java-8-oracle-amd64/bin/java -cp :/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m -Dspark.streaming.unpersist=true -Djava.net.preferIPv4Stack=true -Dsun.io.serialization.extendedDebugInfo=true -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=pubsub01:2181 org.apache.spark.deploy.master.Master --ip 10.10.41.19 --port 7077 --webui-port 8080 log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/04/11 16:07:55 INFO Master: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/11 16:07:55 INFO Master: Starting Spark master at spark://10.10.41.19:7077 14/04/11 16:07:55 INFO MasterWebUI: Started Master web UI at http://10.10.41.19:8080 14/04/11 16:07:55 INFO Master: Persisting recovery state to ZooKeeper 14/04/11 16:07:55 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 14/04/11 16:07:55 INFO ZooKeeper: Client environment:host.name=streamproc01.nexus.commercehub.com 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.version=1.8.0 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.home=/usr/lib/jvm/jdk1.8.0/jre 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.class.path=:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.library.path= 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.compiler=NA 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.name=Linux 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.arch=amd64 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.version=3.5.0-23-generic 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.name=pmogren 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.home=/home/pmogren 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.dir=/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2 14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, connectString=pubsub01:2181 sessionTimeout=3 watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@744bfbb6 14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Starting ZooKeeper LeaderElection agent 14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, connectString=pubsub01:2181 sessionTimeout=3 watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@7f7e6043 14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to authenticate using SASL (unknown error) 14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session 14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to authenticate using SASL (unknown error) 14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable 14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300ce, negotiated timeout = 3 14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session 14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable 14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300cf, negotiated timeout = 3 14/04/11 16:07:55 WARN ZooKeeperLeaderElectionAgent: Cleaning up old ZK master election file that points to this master. 14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Leader
Re: Spark on YARN performance
To reiterate what Tom was saying - the code that runs inside of Spark on YARN is exactly the same code that runs in any deployment mode. There shouldn't be any performance difference once your application starts (assuming you are comparing apples-to-apples in terms of hardware). The differences are just that before your application runs, Spark allocates resources from YARN. This will probably take more time than launching an application against a standalone cluster because YARN's launching mechanism is slower. On Fri, Apr 11, 2014 at 8:43 AM, Tom Graves tgraves...@yahoo.com wrote: I haven't run on mesos before, but I do run on yarn. The performance differences are going to be in how long it takes you go get the Executors allocated. On yarn that is going to depend on the cluster setup. If you have dedicated resources to a queue where you are running your spark job the overhead is pretty minimal. Now if your cluster is multi-tenant and is really busy and you allow other queues are using your capacity it could take some time. It is also possible to run into the situation where the memory of the nodemanagers get fragmented and you don't have any slots big enough for you so you have to wait for other applications to finish. Again this mostly depends on the setup, how big of containers you need for Spark, etc. Tom On Thursday, April 10, 2014 11:12 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Thank you for the reply Mayur, it would be nice to have a comparison about that. I hope one day it will be available, or to have the time to test it myself :) So you're using Mesos for the moment, right? Which are the main differences in you experience? YARN seems to be more flexible and interoperable with other frameworks..am I wrong? Best, Flavio On Thu, Apr 10, 2014 at 5:55 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: I've had better luck with standalone in terms of speed latency. I think thr is impact but not really very high. Bigger impact is towards being able to manage resources share cluster. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Apr 9, 2014 at 12:10 AM, Flavio Pompermaier pomperma...@okkam.itwrote: Hi to everybody, I'm new to Spark and I'd like to know if running Spark on top of YARN or Mesos could affect (and how much) its performance. Is there any doc about this? Best, Flavio
Huge matrix
Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Behaviour of caching when dataset does not fit into memory
One reason could be that spark uses scratch disk space on intermediate calculations so as you perform calculations that data need to be flushed before you can leverage memory for operations. Second issue could be large intermediate data may push more data in rdd onto disk ( something I see in warehouse use cases a lot) . Can you see in storage tab how much of rdd is in memory on each subsequent counts how much intermediate data is generated each time. On Apr 11, 2014 9:22 AM, Pierre Borckmans pierre.borckm...@realimpactanalytics.com wrote: Hi Matei, Could you enlighten us on this please? Thanks Pierre On 11 Apr 2014, at 14:49, Jérémy Subtil jeremy.sub...@gmail.com wrote: Hi Xusen, I was convinced the cache() method would involve in-memory only operations and has nothing to do with disks as the underlying default cache strategy is MEMORY_ONLY. Am I missing something? 2014-04-11 11:44 GMT+02:00 尹绪森 yinxu...@gmail.com: Hi Pierre, 1. cache() would cost time to carry stuffs from disk to memory, so pls do not use cache() if your job is not an iterative one. 2. If your dataset is larger than memory amount, then there will be a replacement strategy to exchange data between memory and disk. 2014-04-11 0:07 GMT+08:00 Pierre Borckmans pierre.borckm...@realimpactanalytics.com: Hi there, Just playing around in the Spark shell, I am now a bit confused by the performance I observe when the dataset does not fit into memory : - i load a dataset with roughly 500 million rows - i do a count, it takes about 20 seconds - now if I cache the RDD and do a count again (which will try cache the data again), it takes roughly 90 seconds (the fraction cached is only 25%). = is this expected? to be roughly 5 times slower when caching and not enough RAM is available? - the subsequent calls to count are also really slow : about 90 seconds as well. = I can see that the first 25% tasks are fast (the ones dealing with data in memory), but then it gets really slow… Am I missing something? I thought performance would decrease kind of linearly with the amour of data fit into memory… Thanks for your help! Cheers *Pierre Borckmans* *Real**Impact* Analytics *| *Brussels Office www.realimpactanalytics.com *| * pierre.borckm...@realimpactanalytics.comthierry.lib...@realimpactanalytics.com *FR *+32 485 91 87 31 *| **Skype* pierre.borckmans -- Best Regards --- Xusen Yin尹绪森 Intel Labs China Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*
SVD under spark/mllib/linalg
Hi, all the code under https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/mllib/linalg has changed. previous matrix classes are all removed, like MatrixEntry, MatrixSVD. Instead breeze matrix definition appears. Do we move to Breeze Linear Algebra when do linear algorithm? another question, are there any matrix multiplication optimized codes in spark? i only see the outer product method in the removed SVD.scala // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry = (entry.i, (entry.j, entry.mval))).groupByKey() val emits = rows.flatMap{ case (rowind, cols) = cols.flatMap{ case (colind1, mval1) = cols.map{ case (colind2, mval2) = ((colind1, colind2), mval1*mval2) } }//colind1: col index, colind2: row index }.reduceByKey(_ + _) thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVD-under-spark-mllib-linalg-tp4156.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SVD under spark/mllib/linalg
It was moved to mllib.linalg.distributed.RowMatrix. With RowMatrix, you can compute column summary statistics, gram matrix, covariance, SVD, and PCA. We will provide multiplication for distributed matrices, but not in v1.0. -Xiangrui On Fri, Apr 11, 2014 at 9:12 PM, wxhsdp wxh...@gmail.com wrote: Hi, all the code under https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/mllib/linalg has changed. previous matrix classes are all removed, like MatrixEntry, MatrixSVD. Instead breeze matrix definition appears. Do we move to Breeze Linear Algebra when do linear algorithm? another question, are there any matrix multiplication optimized codes in spark? i only see the outer product method in the removed SVD.scala // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry = (entry.i, (entry.j, entry.mval))).groupByKey() val emits = rows.flatMap{ case (rowind, cols) = cols.flatMap{ case (colind1, mval1) = cols.map{ case (colind2, mval2) = ((colind1, colind2), mval1*mval2) } }//colind1: col index, colind2: row index }.reduceByKey(_ + _) thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVD-under-spark-mllib-linalg-tp4156.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Huge matrix
Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.comwrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.comwrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.