Block
What is the concept of Block and BlockManager in Spark? How is a Block related to a Partition of a RDD?
pyspark broadcast error
Hi All, When I run the program shown below, I receive the error shown below. I am running the current version of branch-0.9 from github. Note that I do not receive the error when I replace 2 ** 29 with 2 ** X, where X 29. More interestingly, I do not receive the error when X = 30, and when X 30 the code either crashes with Memory Error or Py4JNetworkError: An error occurred while trying to connect to the Java server. I am aware that there are some bugs (https://spark-project.atlassian.net/browse/SPARK-1065) related to memory consumption with pyspark and broadcasting, but the behavior with X = 29 seemed different and I was wondering if anybody had any insight. -Brad *Program* from pyspark import SparkContext SparkContext.setSystemProperty('spark.executor.memory', '25g') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'FeatureExtraction') meg_512 = range((2 ** 29) / 8) tmp_broad = sc.broadcast(meg_512) *Error* --- Py4JError Traceback (most recent call last) ipython-input-1-db8033dee301 in module() 3 sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'FeatureExtraction') 4 meg_1024 = range((2 ** 29) / 8) 5 tmp_broad = sc.broadcast(meg_1024) /home/spark/spark-branch-0.9/python/pyspark/context.py in broadcast(self, value) 277 pickleSer = PickleSerializer() 278 pickled = pickleSer.dumps(value) -- 279 jbroadcast = self._jsc.broadcast(bytearray(pickled)) 280 return Broadcast(jbroadcast.id(), value, jbroadcast, 281 self._pickled_broadcast_vars) /home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 302 raise Py4JError( 303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'. -- 304 format(target_id, '.', name, value)) 305 else: 306 raise Py4JError( Py4JError: An error occurred while calling o7.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:701)
building spark over proxy
Can someone help me on how to build spark over proxy settings .. -- REGARDS ASHUTOSH JAIN IIT-BHU VARANASI
Re: building spark over proxy
http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3ccaaqhkj48japuzqc476es67c+rrfime87uprambdoofhcl0k...@mail.gmail.com%3E On Tue, Mar 11, 2014 at 11:44 AM, hades dark hades.o...@gmail.com wrote: Can someone help me on how to build spark over proxy settings .. -- REGARDS ASHUTOSH JAIN IIT-BHU VARANASI -- Bharath Vissapragada http://www.cloudera.com
Reading sequencefile
Hi all, I'm trying to read a sequenceFile that represent a set of jpeg image generated using this tool : http://stuartsierra.com/2008/04/24/a-million-little-files . According to the documentation : Each key is the name of a file (a Hadoop “Text”), the value is the binary contents of the file (a BytesWritable) How do I load the generated file inside spark ? Cheers, Jaonary
Spark stand alone cluster mode
Hi, I am new to spark. I would like to run jobs in Spark stand alone cluster mode. No cluser managers other than spark is used. (https://spark.apache.org/docs/0.9.0/spark-standalone.html) I have tried wordcount from spark shell and stand alone scala app. The code reads input from HDFS and writes the results to HDFS. uses 2 worker nodes. In spark-shell the wordcount is successful, how ever my effort to run stand alone programmes are in vain. My environement Ubuntu 12.04 - 32 bit JAVA 1.7.0_51 I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating installed hadoop 2.2.0 as separate hduser and given permission to other users. installed scala 2.10.3 installed sbt 0.13.1 Spark master act as HDFS master I have one master and 2 worker nodes and HDFS is accessible in all nodes. I downloaded example project and modified to use my spark cluster. I started the sparkcluster at spark://192.168.0.138:7077 and hdfs://master:9000/ When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get following error gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run [info] Loading project definition from /home/gino/Test/spark-example-project/project [info] Set current project to spark-example-project (in build file:/home/gino/Test/spark-example-project/) [info] Running com.Thinkpalm.spark.WordCountHDFS [error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/SparkContext java.lang.NoClassDefFoundError: org/apache/spark/SparkContext at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) [trace] Stack trace suppressed: run last compile:run for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM Could anyone give some pointers ... I have attached the project for reference. Thanks and regards Gino Mathews spark-example-project.tgz Description: spark-example-project.tgz
Re: Spark stand alone cluster mode
does sbt show full-classpath show spark-core on the classpath? I am still pretty new to scala but it seems like you have val sparkCore = org.apache.spark %% spark-core% V.spark % provided -- I believe the provided part means it's in your classpath. Spark-shell script sets up a lot of stuff for you so... On Tue, Mar 11, 2014 at 9:02 AM, Gino Mathews gin...@thinkpalm.com wrote: Hi, I am new to spark. I would like to run jobs in Spark stand alone cluster mode. No cluser managers other than spark is used. (https://spark.apache.org/docs/0.9.0/spark-standalone.html) I have tried wordcount from spark shell and stand alone scala app. The code reads input from HDFS and writes the results to HDFS. uses 2 worker nodes. In spark-shell the wordcount is successful, how ever my effort to run stand alone programmes are in vain. My environement Ubuntu 12.04 - 32 bit JAVA 1.7.0_51 I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating installed hadoop 2.2.0 as separate hduser and given permission to other users. installed scala 2.10.3 installed sbt 0.13.1 Spark master act as HDFS master I have one master and 2 worker nodes and HDFS is accessible in all nodes. I downloaded example project and modified to use my spark cluster. I started the sparkcluster at spark://192.168.0.138:7077 and hdfs://master:9000/ When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get following error gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run [info] Loading project definition from /home/gino/Test/spark-example-project/project [info] Set current project to spark-example-project (in build file:/home/gino/Test/spark-example-project/) [info] Running com.Thinkpalm.spark.WordCountHDFS [error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/SparkContext java.lang.NoClassDefFoundError: org/apache/spark/SparkContext at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) [trace] Stack trace suppressed: run last compile:run for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM Could anyone give some pointers ... I have attached the project for reference. Thanks and regards Gino Mathews
Pyspark Memory Woes
Dear Sparkians, We are working on a system to do relational modeling on top of Spark, all done in pyspark. While we've been learning a lot about Spark internals so far, we're currently running into memory issues and wondering how best to profile to fix them. Here are our symptoms: - We're operating on data sets up to 80G in size of uncompressed JSON, 66 million records in the largest one. - Sometimes we're joining those large data sets, but cardinality never exceeds 66 million (unless we've got a bug somewhere). - We're seeing various OOM problems: sometimes python takes all available mem, sometimes we OOM with no heap space left, and occasionally OOM with GC overhead limit exceeded. - Sometimes we also see what looks like a single huge message sent over the wire that exceeds the wire format limitations. - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably. - We're trying to isolate specific steps now, but every time it errors, we're partitioning (i.e. partitionByKey is involved somewhere). We've been instrumneting according to the monitoring and tuning docs, but a bit at a loss for where we're going wrong. We suspect poor/wrong partitioning on our part somehow. With that in mind, some questions: - How exactly is partitioning information propagated? It looks like within a pipelined RDD the parent partitioning is preserved throughout unless we either specifically repartition or go through a reduce. We're splitting as much as we can on maps and letting reduces happen normally. Is that good practice? - When doing e.g. partitionByKey, does an entire partition get sent to one worker process? - When does Spark stream data? Are there easy ways to sabotage the streaming? Are there any knobs for us to twiddle here? - Is there any way to specify the number of shuffles for a given reduce step? - How can we get better insight into what our workers are doing, specifically around moving data in and out of python land? I realise it's hard to troubleshoot in the absence of code but any test case we have would be contrived. We're collecting more metrics and trying to reason about what might be happening, but any guidance at this point would be most helpful. Thanks! -- Aaron Olson Data Engineer, Shopify
Spark usage patterns and questions
Hi, I have some questions regarding usage patterns and debugging in spark/spark streaming. 1. What is some used design patterns of using broadcast variable? In my application i created some and also created a scheduled task which periodically refreshes the variables. I want to know how efficiently and in modular way people generally achieve this? 2. Sometimes a uncaught exception in driver program/worker does not get traced anywhere? How can we debug this? 3. In our usecase we read from Kafka, do some mapping and lastly persists data to cassandra as well as pushes the data over remote actor for realtime update in dashboard. I used below approaches - First tried to use vary naive way like stream.map(...).foreachRDD( pushes to actor) It does not work and stage failed saying akka exception - Second tried to use akka.serialization.JavaSerilizer.withSystem(system){...} approach It does not work and stage failed BUT without any trace anywhere in lofs - Finally did rdd.collect to collect the output into driver and then pushes to actor It worked. I would like to know is there any efficient way of achieving this sort of usecases 4. Sometimes I see failed stages but when opened those stage details it said stage did not start. What does this mean? Looking forward for some interesting responses :) Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: NO SUCH METHOD EXCEPTION
Since it’s from Scala, it might mean you’re running with a different version of Scala than you compiled Spark with. Spark 0.8 and earlier use Scala 2.9, while Spark 0.9 uses Scala 2.10. Matei On Mar 11, 2014, at 8:19 AM, Jeyaraj, Arockia R (Arockia) arockia.r.jeya...@verizon.com wrote: Hi, Can anyone help me to resolve this issue? Why am I getting NoSuchMethod exception? 14/03/11 09:56:11 ERROR executor.Executor: Exception in task ID 0 java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lsca la/collection/immutable/StringOps; at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s cala:75) at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58 ) at kafka.utils.ZKConfig.init(ZkUtils.scala:837) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr eam.scala:98) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput DStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc V$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca la:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Loss was due to java.lang.NoSuc hMethodError java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lsca la/collection/immutable/StringOps; at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s cala:75) at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58 ) at kafka.utils.ZKConfig.init(ZkUtils.scala:837) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr eam.scala:98) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput DStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc V$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca la:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) 14/03/11 09:56:11 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; abo rting job 14/03/11 09:56:11 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool 14/03/11 09:56:11 INFO scheduler.DAGScheduler: Failed to run runJob at NetworkIn putTracker.scala:182 [error] (Thread-34) org.apache.spark.SparkException: Job aborted: Task 0.0:0 fai Thanks Arockia Raja
Re: Powered By Spark Page -- Companies Organizations
Thanks, added you. On Mar 11, 2014, at 2:47 AM, Christoph Böhm listenbru...@gmx.net wrote: Dear Spark team, thanks for the great work and congrats on becoming an Apache top-level project! You could add us to your Powered-by-page, because we are using Spark (and Shark) to perform interactive exploration of large datasets. Find us on: www.bakdata.com Best, Christoph - Christoph Böhm bakdata | bespoke data engineering www.bakdata.com
Re: Pyspark Memory Woes
Hi Aaron, When you say Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably., how are you configuring this? -Sandy On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson aaron.ol...@shopify.comwrote: Dear Sparkians, We are working on a system to do relational modeling on top of Spark, all done in pyspark. While we've been learning a lot about Spark internals so far, we're currently running into memory issues and wondering how best to profile to fix them. Here are our symptoms: - We're operating on data sets up to 80G in size of uncompressed JSON, 66 million records in the largest one. - Sometimes we're joining those large data sets, but cardinality never exceeds 66 million (unless we've got a bug somewhere). - We're seeing various OOM problems: sometimes python takes all available mem, sometimes we OOM with no heap space left, and occasionally OOM with GC overhead limit exceeded. - Sometimes we also see what looks like a single huge message sent over the wire that exceeds the wire format limitations. - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably. - We're trying to isolate specific steps now, but every time it errors, we're partitioning (i.e. partitionByKey is involved somewhere). We've been instrumneting according to the monitoring and tuning docs, but a bit at a loss for where we're going wrong. We suspect poor/wrong partitioning on our part somehow. With that in mind, some questions: - How exactly is partitioning information propagated? It looks like within a pipelined RDD the parent partitioning is preserved throughout unless we either specifically repartition or go through a reduce. We're splitting as much as we can on maps and letting reduces happen normally. Is that good practice? - When doing e.g. partitionByKey, does an entire partition get sent to one worker process? - When does Spark stream data? Are there easy ways to sabotage the streaming? Are there any knobs for us to twiddle here? - Is there any way to specify the number of shuffles for a given reduce step? - How can we get better insight into what our workers are doing, specifically around moving data in and out of python land? I realise it's hard to troubleshoot in the absence of code but any test case we have would be contrived. We're collecting more metrics and trying to reason about what might be happening, but any guidance at this point would be most helpful. Thanks! -- Aaron Olson Data Engineer, Shopify
is spark.cleaner.ttl safe?
Hello, I've been trying to run an iterative spark job that spills 1+ GB to disk per iteration on a system with limited disk space. I believe there's enough space if spark would clean up unused data from previous iterations, but as it stands the number of iterations I can run is limited by available disk space. I found a thread on the usage of spark.cleaner.ttl on the old Spark Users Google group here: https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 I think this setting may be what I'm looking for, however the cleaner seems to delete data that's still in use. The effect is I get bizarre exceptions from Spark complaining about missing broadcast data or ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it supposed to delete in-use data or is this a bug/shortcoming? Cheers, Michael
RE: unsubscribe
Ohh ! I thought you're unsubscribing :) Kapil Malik | kma...@adobe.com | 33430 / 8800836581 -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: 12 March 2014 00:51 To: user@spark.apache.org Subject: Re: unsubscribe To unsubscribe from this list, please send a message to user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org and it will automatically unsubscribe you. Matei On Mar 11, 2014, at 12:15 PM, Abhishek Pratap apra...@sagebase.orgmailto:apra...@sagebase.org wrote:
Re: Pyspark Memory Woes
Are you aware that you get an executor (and the 1.5GB) per machine, not per core? On Tue, Mar 11, 2014 at 12:52 PM, Aaron Olson aaron.ol...@shopify.comwrote: Hi Sandy, We're configuring that with the JAVA_OPTS environment variable in $SPARK_HOME/spark-worker-env.sh like this: # JAVA OPTS export SPARK_JAVA_OPTS=-Dspark.ui.port=0 -Dspark.default.parallelism=1024 -Dspark.cores.max=256 -Dspark.executor.memory=1500m -Dspark.worker.timeout=500 -Dspark.akka.timeout=500 Does that value seem low to you? -Aaron On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza sandy.r...@cloudera.comwrote: Hi Aaron, When you say Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably., how are you configuring this? -Sandy On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson aaron.ol...@shopify.comwrote: Dear Sparkians, We are working on a system to do relational modeling on top of Spark, all done in pyspark. While we've been learning a lot about Spark internals so far, we're currently running into memory issues and wondering how best to profile to fix them. Here are our symptoms: - We're operating on data sets up to 80G in size of uncompressed JSON, 66 million records in the largest one. - Sometimes we're joining those large data sets, but cardinality never exceeds 66 million (unless we've got a bug somewhere). - We're seeing various OOM problems: sometimes python takes all available mem, sometimes we OOM with no heap space left, and occasionally OOM with GC overhead limit exceeded. - Sometimes we also see what looks like a single huge message sent over the wire that exceeds the wire format limitations. - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably. - We're trying to isolate specific steps now, but every time it errors, we're partitioning (i.e. partitionByKey is involved somewhere). We've been instrumneting according to the monitoring and tuning docs, but a bit at a loss for where we're going wrong. We suspect poor/wrong partitioning on our part somehow. With that in mind, some questions: - How exactly is partitioning information propagated? It looks like within a pipelined RDD the parent partitioning is preserved throughout unless we either specifically repartition or go through a reduce. We're splitting as much as we can on maps and letting reduces happen normally. Is that good practice? - When doing e.g. partitionByKey, does an entire partition get sent to one worker process? - When does Spark stream data? Are there easy ways to sabotage the streaming? Are there any knobs for us to twiddle here? - Is there any way to specify the number of shuffles for a given reduce step? - How can we get better insight into what our workers are doing, specifically around moving data in and out of python land? I realise it's hard to troubleshoot in the absence of code but any test case we have would be contrived. We're collecting more metrics and trying to reason about what might be happening, but any guidance at this point would be most helpful. Thanks! -- Aaron Olson Data Engineer, Shopify -- Aaron Olson Data Engineer, Shopify
Re: Out of memory on large RDDs
Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. We are not getting any OOM exceptions, just akka future timeouts in mapoutputtracker and unsuccessful get of shuffle outputs, therefore refetching them. What is the industry practice when going about debugging such errors? Questions: - why are mapoutputtrackers timing out? ( and how to debug this properly?) - what is the task/purpose of mapoutputtracker? - how to check per-task objects size? Thanks, Grega On 11 Mar 2014, at 18:43, Mayur Rustagi mayur.rust...@gmail.com wrote: Shuffle data is always stored on disk, its unlikely to cause OOM. Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Mar 11, 2014 at 9:20 AM, sparrow do...@celtra.com wrote: I don't understand how exactly will that help. There are no persisted RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb. The small RDD is then persisted. Memory configuration should not affect shuffle data if I understand you correctly? On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] [hidden email] wrote: Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence) Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec [hidden email] wrote: Hi I have a spark cluster with 4 workers each with 13GB ram. I would like to process a large data set (does not fit in memory) that consists of JSON entries. These are the transformations applied: SparkContext.textFile(s3url). // read files from s3 keyBy(_.parseJson.id) // key by id that is located in json string groupByKey(number_of_group_tasks) //group by id flatMap(case (key,lines) = { //do some stuff }) In the web view I can see a key by operation doing a shuffle write. If I understand correctly the groupByKey transformation creates a wide RDD dependency thus requiring a shuffle write. I have already increased spark.akka.askTimeout to 30 seconds and still job fails with errors on workers: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71) at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [3] milliseconds at akka.dispatch.DefaultPromise.ready(Future.scala:870) at akka.dispatch.DefaultPromise.result(Future.scala:874) at akka.dispatch.Await$.result(Future.scala:74) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
possible bug in Spark's ALS implementation...
Hi, I'm implementing a recommender based on the algorithm described in http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the basis for Spark's ALS implementation for data sets with implicit features. The data set I'm working with is proprietary and I cannot share it, however I can say that it's based on the same kind of data in the paper---relative viewing time of videos. (Specifically, the rating for each video is defined as total viewing time across all visitors divided by video duration). I'm seeing counterintuitive, sometimes nonsensical recommendations. For comparison, I've run the training data through Oryx's in-VM implementation of implicit ALS with the same parameters. Oryx uses the same algorithm. (Source in this file: https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java) The recommendations made by each system compared to one other are very different---moreso than I think could be explained by differences in initial state. The recommendations made by the Oryx models look much better, especially as I increase the number of latent factors and the iterations. The Spark models' recommendations don't improve with increases in either latent factors or iterations. Sometimes, they get worse. Because of the (understandably) highly-optimized and terse style of Spark's ALS implementation, I've had a very hard time following it well enough to debug the issue definitively. However, I have found a section of code that looks incorrect. As described in the paper, part of the implicit ALS algorithm involves computing a matrix product YtCuY (equation 4 in the paper). To optimize this computation, this expression is rewritten as YtY + Yt(Cu - I)Y. I believe that's what should be happening here: https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376 However, it looks like this code is in fact computing YtY + YtY(Cu - I), which is the same as YtYCu. If so, that's a bug. Can someone familiar with this code evaluate my claim? Cheers, Michael
Re: possible bug in Spark's ALS implementation...
Hi Michael, I can help check the current implementation. Would you please go to https://spark-project.atlassian.net/browse/SPARK and create a ticket about this issue with component MLlib? Thanks! Best, Xiangrui On Tue, Mar 11, 2014 at 3:18 PM, Michael Allman m...@allman.ms wrote: Hi, I'm implementing a recommender based on the algorithm described in http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the basis for Spark's ALS implementation for data sets with implicit features. The data set I'm working with is proprietary and I cannot share it, however I can say that it's based on the same kind of data in the paper---relative viewing time of videos. (Specifically, the rating for each video is defined as total viewing time across all visitors divided by video duration). I'm seeing counterintuitive, sometimes nonsensical recommendations. For comparison, I've run the training data through Oryx's in-VM implementation of implicit ALS with the same parameters. Oryx uses the same algorithm. (Source in this file: https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java) The recommendations made by each system compared to one other are very different---moreso than I think could be explained by differences in initial state. The recommendations made by the Oryx models look much better, especially as I increase the number of latent factors and the iterations. The Spark models' recommendations don't improve with increases in either latent factors or iterations. Sometimes, they get worse. Because of the (understandably) highly-optimized and terse style of Spark's ALS implementation, I've had a very hard time following it well enough to debug the issue definitively. However, I have found a section of code that looks incorrect. As described in the paper, part of the implicit ALS algorithm involves computing a matrix product YtCuY (equation 4 in the paper). To optimize this computation, this expression is rewritten as YtY + Yt(Cu - I)Y. I believe that's what should be happening here: https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376 However, it looks like this code is in fact computing YtY + YtY(Cu - I), which is the same as YtYCu. If so, that's a bug. Can someone familiar with this code evaluate my claim? Cheers, Michael
Re: How to create RDD from Java in-memory data?
Ah! Thank you. That'll work for now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2570.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Applications for Spark on HDFS
Hi Paul, What do you mean by distributing the jars manually? If you register jars that are local to the client with SparkContext.addJars, Spark should handle distributing them to the workers. Are you taking advantage of this? -Sandy On Tue, Mar 11, 2014 at 3:09 PM, Paul Schooss paulmscho...@gmail.comwrote: Hello Folks, I was wondering if anyone had experience placing application jars for Spark onto HDFS. Currently I have distributing the jars manually and would love to source the jar via HDFS a la distributed caching with MR. Any ideas? Regards, Paul
Re: How to create RDD from Java in-memory data?
In a similar vein, it would be helpful to have an Iterable way to access the data inside an RDD. The collect method takes everything in the RDD and puts in a list, but this blows up memory. Since everything I want is already inside the RDD, it could be easy to iterate over the content without replicating the array. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2568.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to create RDD from Java in-memory data?
https://github.com/apache/incubator-spark/pull/421 Works pretty good, but really needs to be enhanced to work with AsyncRDDActions. On Tue, Mar 11, 2014 at 4:50 PM, wallacemann wall...@bandpage.com wrote: In a similar vein, it would be helpful to have an Iterable way to access the data inside an RDD. The collect method takes everything in the RDD and puts in a list, but this blows up memory. Since everything I want is already inside the RDD, it could be easy to iterate over the content without replicating the array. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2568.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDD.saveAs...
I agree that we can’t keep adding these to the core API, partly because it will get unwieldy to maintain and partly just because each storage system will bring in lots of dependencies. We can simply have helper classes in different modules for each storage system. There’s some discussion on this at https://spark-project.atlassian.net/browse/SPARK-1127. Matei On Mar 11, 2014, at 9:06 AM, Koert Kuipers ko...@tresata.com wrote: I find the current design to write RDDs to disk (or a database, etc) kind of ugly. It will lead to a proliferation of saveAs methods. A better abstraction would be nice (perhaps a Sink trait to write to)
Re: Block
In my opinion, BlockManager manages many types of Block, RDD's partition, a.k.a. RDDBlock, is one type of them. Other types of Blocks are ShuffleBlock, IndirectBlock (if the task's return status is too large), etc. So, BlockManager is a layer that is independent of RDD concept. On Mar 11, 2014 2:06 AM, David Thomas dt5434...@gmail.com wrote: What is the concept of Block and BlockManager in Spark? How is a Block related to a Partition of a RDD?
Re: Are all transformations lazy?
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet. As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation. (The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021). David Thomas March 11, 2014 at 9:49 PM For example, is distinct() transformation lazy? when I see the Spark source code, distinct applies a map- reduceByKey - map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct? /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): RDD[T] = map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(): RDD[T] = distinct(partitions.size)
Re: Are all transformations lazy?
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'? On Tue, Mar 11, 2014 at 10:06 PM, Ewen Cheslack-Postava m...@ewencp.orgwrote: You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet. As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation. (The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021). David Thomas dt5434...@gmail.com March 11, 2014 at 9:49 PM For example, is distinct() transformation lazy? when I see the Spark source code, distinct applies a map- reduceByKey - map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct? /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): RDD[T] = map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(): RDD[T] = distinct(partitions.size) inline: compose-unknown-contact.jpg