Re: Lost an executor error - Jobs fail
Hmm, interesting. I created https://issues.apache.org/jira/browse/SPARK-1499to track the issue of Workers continuously spewing bad executors, but the real issue seems to be a combination of that and some other bug in Shark or Spark which fails to handle the situation properly. Please let us know if you can reproduce it (especially if deterministic!), or if you can provide any more details about exceptions thrown. A preliminary search didn't bring up much about the error code 101... On Mon, Apr 14, 2014 at 10:03 PM, Praveen R prav...@sigmoidanalytics.comwrote: Unfortunately queries kept failing with SparkTask101 errors and had them working after removing the troublesome node. FAILED: Execution Error, return code -101 from shark.execution.SparkTask I wish it would have been easy to re-produce it. I shall give a try to hard remove write permissions on one node to see if the same error happens. On Tue, Apr 15, 2014 at 9:17 AM, Aaron Davidson ilike...@gmail.comwrote: Cool! It's pretty rare to actually get logs from a wild hardware failure. The problem is as you said, that the executor keeps failing, but the worker doesn't get the hint, so it keeps creating new, bad executors. However, this issue should not have caused your cluster to fail to start up. In the linked logs, for instance, the shark shell started up just fine (though the shark was lost in some of the log messages). Queries should have been able to execute just fine. Was this not the case? On Mon, Apr 14, 2014 at 7:38 AM, Praveen R prav...@sigmoidanalytics.comwrote: Configuration comes from spark-ec2 setup script, sets spark.local.dir to use /mnt/spark, /mnt2/spark. Setup actually worked for quite sometime and then on one of the node there were some disk errors as mv: cannot remove `/mnt2/spark/spark-local-20140409182103-c775/09/shuffle_1_248_0': Read-only file system mv: cannot remove `/mnt2/spark/spark-local-20140409182103-c775/24/shuffle_1_260_0': Read-only file system mv: cannot remove `/mnt2/spark/spark-local-20140409182103-c775/24/shuffle_2_658_0': Read-only file system I understand the issue is hardware level but thought it would be great if spark could handle it and avoid cluster going down. On Mon, Apr 14, 2014 at 7:58 PM, giive chen thegi...@gmail.com wrote: Hi Praveen What is your config about * spark.local.dir ? * Is all your worker has this dir and all worker has right permission on this dir? I think this is the reason of your error Wisely Chen On Mon, Apr 14, 2014 at 9:29 PM, Praveen R prav...@sigmoidanalytics.com wrote: Had below error while running shark queries on 30 node cluster and was not able to start shark server or run any jobs. *14/04/11 19:06:52 ERROR scheduler.TaskSchedulerImpl: Lost an executor 4 (already removed): Failed to create local directory (bad spark.local.dir?)* *Full log: *https://gist.github.com/praveenr019/10647049 After spending quite some time, found it was due to disk read errors on one node and had the cluster working after removing the node. Wanted to know if there is any configuration (like akkaTimeout) which can handle this or does mesos help ? Shouldn't the worker be marked dead in such scenario, instead of making the cluster non-usable so the debugging can be done at leisure. Thanks, Praveen R
Re: storage.MemoryStore estimated size 7 times larger than real
Hey, I was talking about something more like: val size = 1024 * 1024 val numSlices = 8 val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size / numSlices) } val rdd = sc.parallelize(arr, numSlices).cache() val size2 = rdd.map(_.length).sum() assert( size2 == size ) If I do this, I see 8 blocks are put into MemoryStore, each with a size of 512.1 KB, which adds up to almost exactly 4MB as expected. Regarding your other questions: Non-cached RDDs are not written back to disk, their results are simply not stored anywhere. If the results are needed again, the RDD will be recomputed. I'm not sure I understand your distinction between JVM and Spark memory -- both arrays and cached RDDs are stored in the JVM heap. Shuffle operations are unique in that they store intermediate output to local disk immediately, in order to avoid overly expensive recomputation. This shuffle data is always written to disk, whether or not the input RDD(s) are cached, and the final output of the shuffle (the groupBy in your example) will *not* be cached in memory unless explicitly requested. On Mon, Apr 14, 2014 at 8:48 PM, wxhsdp wxh...@gmail.com wrote: thanks for your help, Davidson! i modified val a:RDD[Int] = sc.parallelize(array).cache() to keep val a an RDD of Int, but has the same result another question JVM and spark memory locate at different parts of system memory, the spark code is executed in JVM memory, malloc operation like val e = new Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs are writed back to disk, if cached, RDDs are copied to spark memory for further use, is that right? val RDD_1 = RDD_0.groupByKey{...} shuffle separate stages, can anyone tell me the memory/disk usage of shuffle input RDD and shuffle output RDD under the condition that RDD_0, RDD_1 is cached or not? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4256.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
groupByKey returns a single partition in a RDD?
I want to apply the following transformations to 60Gbyte data on 7nodes with 10Gbyte memory. And I am wondering if groupByKey() function returns a RDD with a single partition for each key? if so, what will happen if the size of the partition doesn't fit into that particular node? rdd = sc.textFile(hdfs//.).map(parserFunc).groupByKey() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-returns-a-single-partition-in-a-RDD-tp4264.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Comparing GraphX and GraphLab
Hi Debasish, I found PageRank LiveJournal cost less than 100 seconds for GraphX in your EC2. But as I use the example (LiveJournalPageRank) you provided in my mechines with the same LiveJournal dataset, It took more than 10 minutes. Following are some details: Environment: 8 machines with each 2*Intel Xeon E5-2650 CPU、256GB memory、6TB hard disk+480GB SSD, Infiniband, Debian Wheezy OS. I use this order: /./bin/run-example org.apache.spark.examples.graphx.LiveJournalPageRank local hdfs://10.1.1.33:9000/dataset/LiveJournal.txt / Should I set more params to get a faster result? Moreover, I want to know the default allocation of computing resources, as run-example may not allow me to allocate them by myself. Regards~ Qi Song -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Comparing-GraphX-and-GraphLab-tp3112p4265.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to stop system info output in spark shell
The solution: Edit /opt/spark-0.9.0-incubating-bin-hadoop2/conf/log4j.properties, changing Spark's output to WARN. Done! Refer to: https://github.com/amplab-extras/SparkR-pkg/blob/master/pkg/src/src/main/resources/log4j.properties#L8 eduardocalfaia wrote Have you already tried in conf/log4j.properties? log4j.rootCategory=OFF Em 4/3/14, 13:46, weida xu escreveu: Hi, alll When I start spark in the shell. It automatically output some system info every minute, see below. Can I stop or block the output of these info? I tried the :silent comnond, but the automatical output remains. 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER 14/04/03 19:34:30 INFO BlockManager: Dropping non broadcast blocks older than 1396524270698 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER 14/04/03 19:34:30 INFO BlockManager: Dropping broadcast blocks older than 1396524270701 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for HTTP_BROADCAST 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for MAP_OUTPUT_TRACKER 14/04/03 19:34:31 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT 14/04/03 19:34:31 INFO DAGScheduler: shuffleToMapStage 0 -- 0 14/04/03 19:34:31 INFO DAGScheduler: stageIdToStage 0 -- 0 14/04/03 19:34:31 INFO DAGScheduler: stageIdToJobIds 0 -- 0 14/04/03 19:34:31 INFO DAGScheduler: pendingTasks 0 -- 0 14/04/03 19:34:31 INFO DAGScheduler: jobIdToStageIds 0 -- 0 14/04/03 19:34:31 INFO DAGScheduler: stageToInfos 0 -- 0 14/04/03 19:34:31 INFO MetadataCleaner: Ran metadata cleaner for DAG_SCHEDULER -- Informativa sulla Privacy: http://www.unibs.it/node/8155 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stop-system-info-output-in-spark-shell-tp3704p4266.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark program thows OutOfMemoryError
Hi, all My spark program always gives me the error java.lang.OutOfMemoryError: Java heap space in my standalone cluster, here is my code: object SimCalcuTotal { def main(args: Array[String]) { val sc = new SparkContext(spark://192.168.2.184:7077, Sim Calcu Total, /usr/local/spark-0.9.0-incubating-bin-hadoop2, Seq(/home/deployer/score-calcu-assembly-1.0.jar)) // val sc = new SparkContext(local, Score Calcu Total) val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { // 0.5 for test var score = 0.5 arg.put(score, score) arg }) val resourcesRDD = jsonRDD.map(arg = arg.get(rid).toString.toLong).distinct // the program crashes at this line of code val bcResources = sc.broadcast(resourcesRDD.collect.toList) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() val resouceScores = sc.broadcast(resourceScoresRDD.collect.toMap) def calSim(item1 : Long, item2 : Long) = { val iv1 = resouceScores.value(item1) val iv2 = resouceScores.value(item2) // 0.5 for test var distance = 0.5 if(distance 0.05){ var json = new JSONObject() json.put(_id, item1.toString + item2.toString) json.put(rid1, item1) json.put(rid2, item2) json.put(sim, distance) json } else null } //val saveRDD = newRDD.map(arg = arg.toString) //newRDD.saveAsTextFile(args(1).toString) val similarityRDD = resourcesRDD.flatMap(resource = { for(other - bcResources.value if resource other) yield calSim(resource, other)}).filter(arg = arg != null) similarityRDD.saveAsTextFile(/home/deployer/sim) } } The data file “/home/deployer/uris.dat” is 2G with lines like this : { id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } And here is my spark-env.sh export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.182 export SPARK_WORKER_MEMORY=20g export SPARK_MEM=10g export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit There are two processes on my server when the spark program is running(before it crashes): java -cp :/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar -Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@192.168.2.183:51339/user/CoarseGrainedScheduler 0 192.168.2.182 16 akka.tcp://sparkWorker@192.168.2.182:45588/user/Worker app-20140415172433-0001 java -cp :/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.2.184:7077 Is there anybody who can help me? Thanks very much!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-tp4268.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
standalone vs YARN
Hi all, I am evaluating Spark to use here at my work. We have an existing Hadoop 1.x install which I planning to upgrade to Hadoop 2.3. I am trying to work out whether I should install YARN or simply just setup a Spark standalone cluster. We already use ZooKeeper so it isn't a problem to setup HA. I am puzzled however as to how the Spark nodes can coordinate on data locality - i.e., assuming I install the nodes on the same machines as the DFS data nodes, I don't understand how Spark can work out which nodes should get which splits of the jobs? Anyway, my bigger question remains: YARN or standalone? Which is the more stable option currently? Which is the more future-proof option? Thanks, Ishaaq -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: standalone vs YARN
Prashant, In another email thread several weeks ago, it was mentioned that YARN support is considered beta until Spark 1.0. Is that not the case? -Suren On Tue, Apr 15, 2014 at 8:38 AM, Prashant Sharma scrapco...@gmail.comwrote: Hi Ishaaq, answers inline from what I know, I had like to be corrected though. On Tue, Apr 15, 2014 at 5:58 PM, ishaaq ish...@gmail.com wrote: Hi all, I am evaluating Spark to use here at my work. We have an existing Hadoop 1.x install which I planning to upgrade to Hadoop 2.3. This is not really a requirement for spark, if you are doing for some other reason great ! I am trying to work out whether I should install YARN or simply just setup a Spark standalone cluster. We already use ZooKeeper so it isn't a problem to setup HA. I am puzzled however as to how the Spark nodes can coordinate on data locality - i.e., assuming I install the nodes on the same machines as the DFS data nodes, I don't understand how Spark can work out which nodes should get which splits of the jobs? This happens exactly the same way hadoop's mapreduce figures out data locality. Since we support hadoop's inputformats(which also has the information on how data is partitioned) etc. So having spark workers share the same nodes as your DFS is a good idea. Anyway, my bigger question remains: YARN or standalone? Which is the more stable option currently? Which is the more future-proof option? Well I think standalone is stable enough for all purposes and Spark's yarn support has been keeping up with latest hadoop versions too. It depends on the fact that if you are already using yarn and don't want the hassle of setting up another cluster manager you can probably prefer yarn. Thanks, Ishaaq -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Scala vs Python performance differences
This would be super useful. Thanks. On 4/15/14, 1:30 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Andrew, I'm putting together some benchmarks for PySpark vs Scala. I'm focusing on ML algorithms, as I'm particularly curious about the relative performance of MLlib in Scala vs the Python MLlib API vs pure Python implementations. Will share real results as soon as I have them, but roughly, in our hands, that 40% number is ballpark correct, at least for some basic operations (e.g textFile, count, reduce). -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-perfor mance-differences-tp4247p4261.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
partitioning of small data sets
I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: [image: Inline image 1] Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana inline: sparkdev_2014-04-11.png
Re: Spark resilience
Thanks Aaron, this is useful ! - Manoj On Mon, Apr 14, 2014 at 8:12 PM, Aaron Davidson ilike...@gmail.com wrote: Launching drivers inside the cluster was a feature added in 0.9, for standalone cluster mode: http://spark.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster Note the supervise flag, which will cause the driver to be restarted if it fails. This is a rather low-level mechanism which by default will just cause the whole job to rerun from the beginning. Special recovery would have to be implemented by hand, via some sort of state checkpointing into a globally visible storage system (e.g., HDFS), which, for example, Spark Streaming already does. Currently, this feature is not supported in YARN or Mesos fine-grained mode. On Mon, Apr 14, 2014 at 2:08 PM, Manoj Samel manojsamelt...@gmail.comwrote: Could you please elaborate how drivers can be restarted automatically ? Thanks, On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.comwrote: Master and slave are somewhat overloaded terms in the Spark ecosystem (see the glossary: http://spark.apache.org/docs/latest/cluster-overview.html#glossary). Are you actually asking about the Spark driver and executors, or the standalone cluster master and workers? To briefly answer for either possibility: (1) Drivers are not fault tolerant but can be restarted automatically, Executors may be removed at any point without failing the job (though losing an Executor may slow the job significantly), and Executors may be added at any point and will be immediately used. (2) Standalone cluster Masters are fault tolerant and failure will only temporarily stall new jobs from starting or getting new resources, but does not affect currently-running jobs. Workers can fail and will simply cause jobs to lose their current Executors. New Workers can be added at any point. On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira ianferre...@hotmail.comwrote: Folks, I was wondering what the failure support modes where for Spark while running jobs 1. What happens when a master fails 2. What happens when a slave fails 3. Can you mid job add and remove slaves Regarding the install on Meso, if I understand correctly the Spark master is behind a Zookeeper quorum so that isolates the slaves from a master failure, but what about the masters behind quorum? Cheers - Ian
Re: storage.MemoryStore estimated size 7 times larger than real
Ah, I think I can see where your issue may be coming from. In spark-shell, the MASTER is local[*], which just means it uses a pre-set number of cores. This distinction only matters because the default number of slices created from sc.parallelize() is based on the number of cores. So when you run from sbt, you probably use a SparkContext with a local master, which sets number of cores to 1, meaning you are doing sc.parallelize(array, 1) while in Spark Shell you are doing sc.parallelize(array, 6ish?) The difference between the two is just that the array is broken up into more parts in the latter, so you will store blocks for rdd_0_0, rdd_0_1, ..., rdd_0_5 rather than just one (large) block. In both cases, though, I suspect that the total size is around the same, at around 28 MB. In my case, where I have an RDD[Array[Int]], I have 8 partitions (a number I just chose randomly), and each one is 512 KB, so the total size is actually 4 MB. You could do the same test with numSlices = 1, and you'd just have a single 4 MB block. The reason our two solutions produced different total memory values is because of Java primitive boxing [1]. In your case, your RDD[Int] is converted into an Array[Any] right before being stored into memory, which causes it to be effectively an Array[java.lang.Integer] [2]. In my case, the actual values inside the RDD are primitive arrays, so they cannot be broken up. Spark still converts my RDD[Array[Int]] into an Array[Any], but Array[Int] is already an Any, so there's no memory impact here. [1] http://docs.oracle.com/javase/tutorial/java/data/autoboxing.html [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L90 On Tue, Apr 15, 2014 at 3:58 AM, wxhsdp wxh...@gmail.com wrote: sorry, davidosn, i don't catch the point. what's the essential difference between our codes? /*my code*/ val array = new Array[Int](size) val a = sc.parallelize(array).cache() /*4MB*/ /*your code*/ val numSlices = 8 val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size / numSlices) } val rdd = sc.parallelize(arr, numSlices).cache() i'm in local mode, with only one partitions, it's just an RDD of one partition with the type RDD[Int] your RDD have 8 partitions with the type RDD[Array[Int]], do that matter? my question is why the memory usage is 7x in sbt, but right in spark shell? as to the following question, i made a mistake, sorry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4269.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: partitioning of small data sets
Take a look at the minSplits argument for SparkContext#textFile [1] -- the default value is 2. You can simply set this to 1 if you'd prefer not to split your data. [1] http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.comwrote: I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: [image: Inline image 1] Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana inline: sparkdev_2014-04-11.png
Why these operations are slower than the equivalent on Hadoop?
Hi all, As a previous thread, I am asking how to implement a divide-and-conquer algorithm (skyline) in Spark. Here is my current solution: val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble)) val result = data.mapPartitions(points = skyline(points.toArray).iterator).coalesce(1, true) .mapPartitions(points = skyline(points.toArray).iterator).collect() where skyline is a local algorithm to compute the results: def skyline(points: Array[Point]) : Array[Point] Basically, I find this implement runs slower than the corresponding Hadoop version (the identity map phase plus local skyline for both combine and reduce phases). Below are my questions: 1. Why this implementation is much slower than the Hadoop one? I can find two possible reasons: one is the shuffle overhead in coalesce, another is calling the toArray and iterator repeatedly when invoking local skyline algorithm. But I am not sure which one is true. 2. One observation is that while Hadoop version almost used up all the CPU resources during execution, the CPU seems not that hot on Spark. Is that a clue to prove that the shuffling might be the real bottleneck? 3. Is there any difference between coalesce(1, true) and reparation? It seems that both opeartions need shuffling data. What’s the proper situations using the coalesce method? 4. More generally, I am trying to implementing some core geometry computation operators on Spark (like skyline, convex hull etc). In my understanding, since Spark is more capable of handling iterative computations on dataset, the above solution apparently doesn’t exploit what Spark is good at. Any comments on how to do geometry computations on Spark (if it is possible) ? Thanks for any insight. Yanzhe
Streaming job having Cassandra query : OutOfMemoryError
Hi All, I am desperately looking for some help. My cluster is 6 nodes having dual core and 8GB ram each. Spark version running on the cluster is spark-0.9.0-incubating-bin-cdh4. I am getting OutOfMemoryError when running a Spark Streaming job (non-streaming version works fine) which queries Cassandra table (simple query returning 3-4 rows) by connecting to the Spark standalone cluster master. java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedByteArray(WritableUtils.java:38) at org.apache.hadoop.io.WritableUtils.readCompressedString(WritableUtils.java:87) at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:185) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) Apr 15, 2014 6:53:39 PM org.apache.spark.Logging$class logInfo Spark job dependencies are dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.10.3/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version0.9.0-incubating/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version0.9.0-incubating/version /dependency dependency groupIdorg.apache.cassandra/groupId artifactIdcassandra-all/artifactId version2.0.6/version /dependency dependency groupIdcom.tuplejump/groupId artifactIdcalliope_2.10/artifactId version0.9.0-U1-C2-EA/version /dependency Various memory variables are configured as below. spark.executor.memory = 4g SPARK_MEM = 2g SPARK_WORKER_MEMORY = 4g Can you you please let me know where am I going wrong. Thanks, Sony -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-having-Cassandra-query-OutOfMemoryError-tp4280.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: partitioning of small data sets
Yup, one reason it’s 2 actually is to give people a similar experience to working with large files, in case their code doesn’t deal well with the file being partitioned. Matei On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote: Take a look at the minSplits argument for SparkContext#textFile [1] -- the default value is 2. You can simply set this to 1 if you'd prefer not to split your data. [1] http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.com wrote: I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: sparkdev_2014-04-11.png Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana
scheduler question
Hi Folks, I have some questions about how Spark scheduler works: - How does Spark know how many resources a job might need? - How does it fairly share resources between multiple jobs? - Does it know about data and partition sizes and use that information for scheduling? Mohit.
Re: Why these operations are slower than the equivalent on Hadoop?
Your Spark solution first reduces partial results into a single partition, computes the final result, and then collects to the driver side. This involves a shuffle and two waves of network traffic. Instead, you can directly collect partial results to the driver and then computes the final results on driver side: val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble))val partialResults = data.mapPartitions(points = skyline(points.toArray).iterator).collect()val results = skyline(partialResults) On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote: Hi all, As a previous thread, I am asking how to implement a divide-and-conquer algorithm (skyline) in Spark. Here is my current solution: val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble)) val result = data.mapPartitions(points = *skyline*(points.toArray).iterator).coalesce(1, true) .mapPartitions(points = *skyline* (points.toArray).iterator).collect() where skyline is a local algorithm to compute the results: def *skyline*(points: Array[Point]) : Array[Point] Basically, I find this implement runs slower than the corresponding Hadoop version (the identity map phase plus local skyline for both combine and reduce phases). Below are my questions: 1. Why this implementation is much slower than the Hadoop one? I can find two possible reasons: one is the shuffle overhead in coalesce, another is calling the toArray and iterator repeatedly when invoking local skyline algorithm. But I am not sure which one is true. I haven’t seen your Hadoop version. But if this assumption is right, the above version should help. 2. One observation is that while Hadoop version almost used up all the CPU resources during execution, the CPU seems not that hot on Spark. Is that a clue to prove that the shuffling might be the real bottleneck? How many parallel tasks are there when running your Spark code? I doubt tasks are queued and run sequentially. 3. Is there any difference between coalesce(1, true) and reparation? It seems that both opeartions need shuffling data. What’s the proper situations using the coalesce method? repartition(n) is just an alias of coalesce(n, true), so yes, they both involve data shuffling. coalesce can be used to shrink partition number when dataset size shrinks dramatically after operations like filter. Say you have an RDD containing 1TB of data with 100 partitions, after a .filter(...) call, only 20GB data left, then you may want to coalesce to 2 partitions rather than 100. 4. More generally, I am trying to implementing some core geometry computation operators on Spark (like skyline, convex hull etc). In my understanding, since Spark is more capable of handling iterative computations on dataset, the above solution apparently doesn’t exploit what Spark is good at. Any comments on how to do geometry computations on Spark (if it is possible) ? Although Spark is good at iterative algorithms, it also performs better in batch computing due to much lower scheduling overhead and thread level parallelism. Theoretically, you can always accelerate your MapReduce job by rewriting it in Spark. Thanks for any insight. Yanzhe
Re: How to cogroup/join pair RDDs with different key types?
Andrew, Thank you very much for your feedback. Unfortunately, the ranges are not of predictable size but you gave me an idea of how to handle it. Here's what I'm thinking: 1. Choose number of partitions, n, over IP space 2. Preprocess the IPRanges, splitting any of them that cross partition boundaries 3. Partition ipToUrl and the new ipRangeToZip according to the partitioning scheme from step 1 4. Join matching partitions of these two RDDs I still don't know how to do step 4 though. I see that RDDs have a mapPartitions() operation to let you do whatever you want with a partition. What I need is a way to get my hands on two partitions at once, each from different RDDs. Any ideas? Thanks, Roger On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash and...@andrewash.com wrote: Are your IPRanges all on nice, even CIDR-format ranges? E.g. 192.168.0.0/16 or 10.0.0.0/8? If the range is always an even subnet mask and not split across subnets, I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then joining the two RDDs. The expansion would be at most 32x if all your ranges can be expressed in CIDR notation, and in practice would be much smaller than that (typically you don't need things bigger than a /8 and often not smaller than a /24) Hopefully you can use your knowledge of the ip ranges to make this feasible. Otherwise, you could additionally flatmap the ipRangeToZip out to a list of CIDR notations and do the join then, but you're starting to have the cartesian product work against you on scale at that point. Andrew On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover roger.hoo...@gmail.comwrote: Hi, I'm trying to figure out how to join two RDDs with different key types and appreciate any suggestions. Say I have two RDDS: ipToUrl of type (IP, String) ipRangeToZip of type (IPRange, String) How can I join/cogroup these two RDDs together to produce a new RDD of type (IP, (String, String)) where IP is the key and the values are the urls and zipcodes? Say I have a method on the IPRange class called matches(ip: IP), I want the joined records to match when ipRange.matches(ip). Thanks, Roger
Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
I've received the same error with Spark built using Maven. It turns out that mesos-0.13.0 depends on protobuf-2.4.1 which is causing the clash at runtime. Protobuf included by Akka is shaded and doesn't cause any problems. The solution is to update the mesos dependency to 0.18.0 in spark's pom.xml. Rebuilding the JAR with this configuration solves the issue. -Anant -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p4286.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to cogroup/join pair RDDs with different key types?
I'm thinking of creating a union type for the key so that IPRange and IP types can be joined. On Tue, Apr 15, 2014 at 10:44 AM, Roger Hoover roger.hoo...@gmail.comwrote: Andrew, Thank you very much for your feedback. Unfortunately, the ranges are not of predictable size but you gave me an idea of how to handle it. Here's what I'm thinking: 1. Choose number of partitions, n, over IP space 2. Preprocess the IPRanges, splitting any of them that cross partition boundaries 3. Partition ipToUrl and the new ipRangeToZip according to the partitioning scheme from step 1 4. Join matching partitions of these two RDDs I still don't know how to do step 4 though. I see that RDDs have a mapPartitions() operation to let you do whatever you want with a partition. What I need is a way to get my hands on two partitions at once, each from different RDDs. Any ideas? Thanks, Roger On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash and...@andrewash.com wrote: Are your IPRanges all on nice, even CIDR-format ranges? E.g. 192.168.0.0/16 or 10.0.0.0/8? If the range is always an even subnet mask and not split across subnets, I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then joining the two RDDs. The expansion would be at most 32x if all your ranges can be expressed in CIDR notation, and in practice would be much smaller than that (typically you don't need things bigger than a /8 and often not smaller than a /24) Hopefully you can use your knowledge of the ip ranges to make this feasible. Otherwise, you could additionally flatmap the ipRangeToZip out to a list of CIDR notations and do the join then, but you're starting to have the cartesian product work against you on scale at that point. Andrew On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover roger.hoo...@gmail.comwrote: Hi, I'm trying to figure out how to join two RDDs with different key types and appreciate any suggestions. Say I have two RDDS: ipToUrl of type (IP, String) ipRangeToZip of type (IPRange, String) How can I join/cogroup these two RDDs together to produce a new RDD of type (IP, (String, String)) where IP is the key and the values are the urls and zipcodes? Say I have a method on the IPRange class called matches(ip: IP), I want the joined records to match when ipRange.matches(ip). Thanks, Roger
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
Actually altering the classpath in the REPL causes the provided SparkContext to disappear: scala sc.parallelize(List(1,2,3)) res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:13 scala :cp /root Added '/root'. Your new classpath is: :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 18:19:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48978 Replaying: sc.parallelize(List(1,2,3)) console:8: error: not found: value sc sc.parallelize(List(1,2,3)) On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote: Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
Can't run a simple spark application with 0.9.1
Hello, Currently I deployed 0.9.1 spark using a new way of starting up spark exec start-stop-daemon --start --pidfile /var/run/spark.pid --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME} --exec /usr/bin/java -- -cp ${CLASSPATH} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10111 -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS} where class path points to the spark jar that we compile with sbt. When I try to run a job I receive the following warning WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory My first question is do I need the entire spark project on disk in order to run jobs? Or what else am I doing wrong?
Re: Why these operations are slower than the equivalent on Hadoop?
It depends on your algorithm but I guess that you probably should use reduce (the code probably doesn't compile but it shows you the idea). val result = data.reduce { case (left, right) = skyline(left ++ right) } Or in the case you want to merge the result of a partition with another one you could do: val result = data.mapPartitions { points = // transforms all the partition into a single element, but this may incur some other problems, especially if you use Kryo serialization... *Seq(skyline*(points.toArray)) }.reduce { case (left, right) = skyline(left ++ right) } 2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com: Your Spark solution first reduces partial results into a single partition, computes the final result, and then collects to the driver side. This involves a shuffle and two waves of network traffic. Instead, you can directly collect partial results to the driver and then computes the final results on driver side: val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble))val partialResults = data.mapPartitions(points = skyline(points.toArray).iterator).collect()val results = skyline(partialResults) On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote: Hi all, As a previous thread, I am asking how to implement a divide-and-conquer algorithm (skyline) in Spark. Here is my current solution: val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble)) val result = data.mapPartitions(points = *skyline*(points.toArray).iterator).coalesce(1, true) .mapPartitions(points = *skyline* (points.toArray).iterator).collect() where skyline is a local algorithm to compute the results: def *skyline*(points: Array[Point]) : Array[Point] Basically, I find this implement runs slower than the corresponding Hadoop version (the identity map phase plus local skyline for both combine and reduce phases). Below are my questions: 1. Why this implementation is much slower than the Hadoop one? I can find two possible reasons: one is the shuffle overhead in coalesce, another is calling the toArray and iterator repeatedly when invoking local skyline algorithm. But I am not sure which one is true. I haven’t seen your Hadoop version. But if this assumption is right, the above version should help. 2. One observation is that while Hadoop version almost used up all the CPU resources during execution, the CPU seems not that hot on Spark. Is that a clue to prove that the shuffling might be the real bottleneck? How many parallel tasks are there when running your Spark code? I doubt tasks are queued and run sequentially. 3. Is there any difference between coalesce(1, true) and reparation? It seems that both opeartions need shuffling data. What’s the proper situations using the coalesce method? repartition(n) is just an alias of coalesce(n, true), so yes, they both involve data shuffling. coalesce can be used to shrink partition number when dataset size shrinks dramatically after operations like filter. Say you have an RDD containing 1TB of data with 100 partitions, after a .filter(...) call, only 20GB data left, then you may want to coalesce to 2 partitions rather than 100. 4. More generally, I am trying to implementing some core geometry computation operators on Spark (like skyline, convex hull etc). In my understanding, since Spark is more capable of handling iterative computations on dataset, the above solution apparently doesn’t exploit what Spark is good at. Any comments on how to do geometry computations on Spark (if it is possible) ? Although Spark is good at iterative algorithms, it also performs better in batch computing due to much lower scheduling overhead and thread level parallelism. Theoretically, you can always accelerate your MapReduce job by rewriting it in Spark. Thanks for any insight. Yanzhe
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
This is probably related to the Scala bug that :cp does not work: https://issues.scala-lang.org/browse/SI-6502 On Tue, Apr 15, 2014 at 11:21 AM, Walrus theCat walrusthe...@gmail.comwrote: Actually altering the classpath in the REPL causes the provided SparkContext to disappear: scala sc.parallelize(List(1,2,3)) res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:13 scala :cp /root Added '/root'. Your new classpath is: :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 18:19:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48978 Replaying: sc.parallelize(List(1,2,3)) console:8: error: not found: value sc sc.parallelize(List(1,2,3)) On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote: Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
Dankeschön ! On Tue, Apr 15, 2014 at 11:29 AM, Aaron Davidson ilike...@gmail.com wrote: This is probably related to the Scala bug that :cp does not work: https://issues.scala-lang.org/browse/SI-6502 On Tue, Apr 15, 2014 at 11:21 AM, Walrus theCat walrusthe...@gmail.comwrote: Actually altering the classpath in the REPL causes the provided SparkContext to disappear: scala sc.parallelize(List(1,2,3)) res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:13 scala :cp /root Added '/root'. Your new classpath is: :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 18:19:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48978 Replaying: sc.parallelize(List(1,2,3)) console:8: error: not found: value sc sc.parallelize(List(1,2,3)) On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote: Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
Shark: class java.io.IOException: Cannot run program /bin/java
Hi, after starting the shark-shell via /opt/shark/shark-0.9.1/bin/shark-withinfo -skipRddReload I receive lots of output, including the exception that /bin/java cannot be executed. But it is linked to /usr/bin/java ?!?! root#ls -al /bin/java lrwxrwxrwx 1 root root 13 15. Apr 21:45 /bin/java - /usr/bin/java root#/bin/java -version java version 1.7.0_51 OpenJDK Runtime Environment (rhel-2.4.4.1.el6_5-x86_64 u51-b02) OpenJDK 64-Bit Server VM (build 24.45-b08, mixed mode) Starting the shark shell: [root@hadoop-pg-5 bin]# /opt/shark/shark-0.9.1/bin/shark-withinfo -skipRddReload -hiveconf hive.root.logger=INFO,console -skipRddReload Starting the Shark Command Line Client 14/04/15 21:45:57 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. Logging initialized using configuration in jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties 14/04/15 21:45:58 INFO SessionState: Logging initialized using configuration in jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties Hive history file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt 14/04/15 21:45:58 INFO exec.HiveHistory: Hive history file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt 14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 14/04/15 21:45:59 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 14/04/15 21:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/04/15 21:46:00 INFO Remoting: Starting remoting 14/04/15 21:46:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@hadoop-pg-5.cluster:38835] 14/04/15 21:46:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hadoop-pg-5.cluster:38835] 14/04/15 21:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster 5,108: [GC 262656K-26899K(1005568K), 0,0409080 secs] 14/04/15 21:46:00 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140415214600-9537 14/04/15 21:46:00 INFO storage.MemoryStore: MemoryStore started with capacity 589.2 MB. 14/04/15 21:46:00 INFO network.ConnectionManager: Bound socket to port 51889 with id = ConnectionManagerId(hadoop-pg-5.cluster,51889) 14/04/15 21:46:00 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/04/15 21:46:00 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop-pg-5.cluster:51889 with 589.2 MB RAM 14/04/15 21:46:00 INFO storage.BlockManagerMaster: Registered BlockManager 14/04/15 21:46:00 INFO spark.HttpServer: Starting HTTP Server 14/04/15 21:46:00 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 21:46:00 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:59414 14/04/15 21:46:00 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.147.210.5:59414 14/04/15 21:46:01 INFO spark.SparkEnv: Registering MapOutputTracker 14/04/15 21:46:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-cf56ada9-d950-4abc-a1c3-76fecdc4faa3 14/04/15 21:46:01 INFO spark.HttpServer: Starting HTTP Server 14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 21:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:45689 14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 14/04/15 21:46:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 14/04/15
Problem with KryoSerializer
Hi, I have a problem when i want to use spark kryoserializer by extending a class Kryoregistarar to register custom classes inorder to create objects.I am getting following exception When I run following program..Please let me know what could be the problem... ] (run-main) org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: main.scala.Utilities Registering classes objects: package main.scala import com.esotericsoftware.kryo import org.apache.spark.serializer.KryoRegistrator import com.esotericsoftware.kryo._ class MykryoRegistrar extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[main.scala.Meter_data]) kryo.register(classOf[main.scala.Utilities]) } } MeterData_PerDay:Main class object MeterData_PerDay { def main(args: Array[String]) { System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.kryo.registrator, main.scala.MykryoRegistrar) var utilclass:Utilities = new Utilities() val sc = new SparkContext(local, Simple App, utilclass.spark_home, List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar)) val file = sc.textFile(utilclass.data_home) }} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-KryoSerializer-tp4295.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
StackOverflow Error when run ALS with 100 iterations
Hi, I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS program cannot run even with a very small size of training data (about 91 lines) due to StackVverFlow error when I set the number of iterations to 100. I think the problem may be caused by updateFeatures method which updates products RDD iteratively by join previous products RDD. I am writing a program which has a similar update process with ALS. This problem also appeared when I iterate too many times (more than 80). The iterative part of my code is as following: solution = outlinks.join(solution). map { ... } Has anyone had similar problem? Thanks. Xiaoli
Re: partitioning of small data sets
Looking at the Python version of textFile()http://spark.apache.org/docs/latest/api/pyspark/pyspark.context-pysrc.html#SparkContext.textFile, shouldn't it be *max*(self.defaultParallelism, 2)? If the default parallelism is, say 4, wouldn't we want to use that for minSplits instead of 2? On Tue, Apr 15, 2014 at 1:04 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Yup, one reason it’s 2 actually is to give people a similar experience to working with large files, in case their code doesn’t deal well with the file being partitioned. Matei On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote: Take a look at the minSplits argument for SparkContext#textFile [1] -- the default value is 2. You can simply set this to 1 if you'd prefer not to split your data. [1] http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.comwrote: I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: sparkdev_2014-04-11.png Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana
Multi-tenant?
What is the support for multi-tenancy in Spark. I assume more than one driver can share the same cluster, but can a driver run two jobs in parallel?
java.net.SocketException: Network is unreachable while connecting to HBase
I am getting a java.net.SocketException: Network is unreachable whenever i do a count on one of my tables. If i just do a take(1), i see the task status as killed on the master UI but i get back the results. My driver runs on my local system which is accessible over the public internet and connects to a remote cluster. This is the code i am trying out. Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set(hbase.zookeeper.quorum, xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx); hbaseConf.set(TableInputFormat.INPUT_TABLE, table); JavaPairRDDImmutableBytesWritable, Result rdd = sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); System.out.println(Count=+rdd.count()); Please suggest what i am missing and how to fix this issue. Thanks a lot. 14/04/15 22:39:22 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: x (PROCESS_LOCAL) 14/04/15 22:39:22 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 1731 bytes in 22 ms 14/04/15 22:39:24 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 14/04/15 22:39:24 WARN scheduler.TaskSetManager: Loss was due to java.net.SocketException java.net.SocketException: Network is unreachable at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at sun.net.NetworkClient.doConnect(NetworkClient.java:180) at sun.net.www.http.HttpClient.openServer(HttpClient.java:378) at sun.net.www.http.HttpClient.openServer(HttpClient.java:473) at sun.net.www.http.HttpClient.init(HttpClient.java:203) at sun.net.www.http.HttpClient.New(HttpClient.java:290) at sun.net.www.http.HttpClient.New(HttpClient.java:306) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:995) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:931) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:849) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1299) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at
Re: Can't run a simple spark application with 0.9.1
I am a dork please disregard this issue. I did not have the slaves correctly configured. This error is very misleading On Tue, Apr 15, 2014 at 11:21 AM, Paul Schooss paulmscho...@gmail.comwrote: Hello, Currently I deployed 0.9.1 spark using a new way of starting up spark exec start-stop-daemon --start --pidfile /var/run/spark.pid --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME} --exec /usr/bin/java -- -cp ${CLASSPATH} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10111 -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS} where class path points to the spark jar that we compile with sbt. When I try to run a job I receive the following warning WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory My first question is do I need the entire spark project on disk in order to run jobs? Or what else am I doing wrong?
Re: Multi-tenant?
Yes, both things can happen. Take a look at http://spark.apache.org/docs/latest/job-scheduling.html, which includes scheduling concurrent jobs within the same driver. Matei On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote: What is the support for multi-tenancy in Spark. I assume more than one driver can share the same cluster, but can a driver run two jobs in parallel?
RE: Multi-tenant?
Thanks Matei! Sent from my Windows Phone From: Matei Zahariamailto:matei.zaha...@gmail.com Sent: 4/15/2014 7:14 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Multi-tenant? Yes, both things can happen. Take a look at http://spark.apache.org/docs/latest/job-scheduling.html, which includes scheduling concurrent jobs within the same driver. Matei On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote: What is the support for multi-tenancy in Spark. I assume more than one driver can share the same cluster, but can a driver run two jobs in parallel?
Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
Hi Prasad Sorry for missing your reply. https://gist.github.com/thegiive/10791823 Here it is. Wisely Chen On Fri, Apr 4, 2014 at 11:57 PM, Prasad ramachandran.pra...@gmail.comwrote: Hi Wisely, Could you please post your pom.xml here. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p3770.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: storage.MemoryStore estimated size 7 times larger than real
thank you so much, davidson ye, you are right, in both sbt and spark shell, the result of my code is 28MB, it's irrelevant to numSlices. yesterday i had the result of 4.2MB in spark shell, because i remove array initialization for laziness:) for(i - 0 until size) { array(i) = i } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4306.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: StackOverflow Error when run ALS with 100 iterations
Probably this JIRA issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves your problem. When running with large iteration number, the lineage DAG of ALS becomes very deep, both DAGScheduler and Java serializer may overflow because they are implemented in a recursive way. You may resort to checkpointing as a workaround. On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi, I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS program cannot run even with a very small size of training data (about 91 lines) due to StackVverFlow error when I set the number of iterations to 100. I think the problem may be caused by updateFeatures method which updates products RDD iteratively by join previous products RDD. I am writing a program which has a similar update process with ALS. This problem also appeared when I iterate too many times (more than 80). The iterative part of my code is as following: solution = outlinks.join(solution). map { ... } Has anyone had similar problem? Thanks. Xiaoli
JMX with Spark
Has anyone got this working? I have enabled the properties for it in the metrics.conf file and ensure that it is placed under spark's home directory. Any ideas why I don't see spark beans ?
Re: JMX with Spark
home directory or $home/conf directory? works for me with metrics.properties hosted under conf dir. On Tue, Apr 15, 2014 at 6:08 PM, Paul Schooss paulmscho...@gmail.comwrote: Has anyone got this working? I have enabled the properties for it in the metrics.conf file and ensure that it is placed under spark's home directory. Any ideas why I don't see spark beans ?
Re: java.net.SocketException: Network is unreachable while connecting to HBase
In the worker logs i can see, 14/04/16 01:02:47 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@xx:10548] - [akka.tcp://sparkExecutor@xx:16041]: Error [Association failed with [akka.tcp://sparkExecutor@xx:16041]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@xx:16041] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: xx/xx.xx.xx.xx:16041 ] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-net-SocketException-Network-is-unreachable-while-connecting-to-HBase-tp4301p4310.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: StackOverflow Error when run ALS with 100 iterations
Thanks a lot for your information. It really helps me. On Tue, Apr 15, 2014 at 7:57 PM, Cheng Lian lian.cs@gmail.com wrote: Probably this JIRA issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves your problem. When running with large iteration number, the lineage DAG of ALS becomes very deep, both DAGScheduler and Java serializer may overflow because they are implemented in a recursive way. You may resort to checkpointing as a workaround. On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.comwrote: Hi, I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS program cannot run even with a very small size of training data (about 91 lines) due to StackVverFlow error when I set the number of iterations to 100. I think the problem may be caused by updateFeatures method which updates products RDD iteratively by join previous products RDD. I am writing a program which has a similar update process with ALS. This problem also appeared when I iterate too many times (more than 80). The iterative part of my code is as following: solution = outlinks.join(solution). map { ... } Has anyone had similar problem? Thanks. Xiaoli
Re: Why these operations are slower than the equivalent on Hadoop?
Eugen, Thanks for your tip and I do want to merge the result of a partition with another one but I am still not quite clear how to do it. Say the original data rdd has 32 partitions and since mapPartitions won’t change the number of partitions, it will remain 32 partitions which each contains the partial skyline of points in its partition. Now I want to merge those 32 partitions to generate a new skyline. It will be better if I can use reduce to merge each two of them (than just collect them in to one), but I think simply calling reduce method on the rdd won’t work because it reduce the data at the granularity of point rather than the partition results (which is the collection of points). So is there a way to reduce the data at the granularity of partitions? Thanks, Yanzhe On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote: It depends on your algorithm but I guess that you probably should use reduce (the code probably doesn't compile but it shows you the idea). val result = data.reduce { case (left, right) = skyline(left ++ right) } Or in the case you want to merge the result of a partition with another one you could do: val result = data.mapPartitions { points = // transforms all the partition into a single element, but this may incur some other problems, especially if you use Kryo serialization... Seq(skyline(points.toArray)) }.reduce { case (left, right) = skyline(left ++ right) } 2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com (mailto:lian.cs@gmail.com): Your Spark solution first reduces partial results into a single partition, computes the final result, and then collects to the driver side. This involves a shuffle and two waves of network traffic. Instead, you can directly collect partial results to the driver and then computes the final results on driver side: val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble)) val partialResults = data.mapPartitions(points = skyline(points.toArray).iterator).collect() val results = skyline(partialResults) On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com (mailto:yanzhe...@gmail.com) wrote: Hi all, As a previous thread, I am asking how to implement a divide-and-conquer algorithm (skyline) in Spark. Here is my current solution: val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble)) val result = data.mapPartitions(points = skyline(points.toArray).iterator).coalesce(1, true) .mapPartitions(points = skyline(points.toArray).iterator).collect() where skyline is a local algorithm to compute the results: def skyline(points: Array[Point]) : Array[Point] Basically, I find this implement runs slower than the corresponding Hadoop version (the identity map phase plus local skyline for both combine and reduce phases). Below are my questions: 1. Why this implementation is much slower than the Hadoop one? I can find two possible reasons: one is the shuffle overhead in coalesce, another is calling the toArray and iterator repeatedly when invoking local skyline algorithm. But I am not sure which one is true. I haven’t seen your Hadoop version. But if this assumption is right, the above version should help. 2. One observation is that while Hadoop version almost used up all the CPU resources during execution, the CPU seems not that hot on Spark. Is that a clue to prove that the shuffling might be the real bottleneck? How many parallel tasks are there when running your Spark code? I doubt tasks are queued and run sequentially. 3. Is there any difference between coalesce(1, true) and reparation? It seems that both opeartions need shuffling data. What’s the proper situations using the coalesce method? repartition(n) is just an alias of coalesce(n, true), so yes, they both involve data shuffling. coalesce can be used to shrink partition number when dataset size shrinks dramatically after operations like filter. Say you have an RDD containing 1TB of data with 100 partitions, after a .filter(...) call, only 20GB data left, then you may want to coalesce to 2 partitions rather than 100. 4. More generally, I am trying to implementing some core geometry computation operators on Spark (like skyline, convex hull etc). In my understanding, since Spark is more capable of handling iterative computations on dataset, the above solution apparently doesn’t exploit what Spark is good at. Any comments on
what is the difference between element and partition?
-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-difference-between-element-and-partition-tp4317.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
groupByKey(None) returns partitions according to the keys?
I was wonder if groupByKey returns 2 partitions in the below example? x = sc.parallelize([(a, 1), (b, 1), (a, 1)]) sorted(x.groupByKey().collect()) [('a', [1, 1]), ('b', [1])] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-None-returns-partitions-according-to-the-keys-tp4318.html Sent from the Apache Spark User List mailing list archive at Nabble.com.