Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?
There is a open PR [1] to support broadcast larger than 2G, could you try it? [1] https://github.com/apache/spark/pull/2659 On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon mr.tom.sed...@gmail.com wrote: Hi, Just wondering if anyone has any advice about this issue, as I am experiencing the same thing. I'm working with multiple broadcast variables in PySpark, most of which are small, but one of around 4.5GB, using 10 workers at 31GB memory each and driver with same spec. It's not running out of memory as far as I can see, but definitely only happens when I add the large broadcast. Would be most grateful for advice. I tried playing around with the last 3 conf settings below, but no luck: SparkConf().set(spark.master.memory, 26) .set(spark.executor.memory, 26) .set(spark.worker.memory, 26) .set(spark.driver.memory, 26). .set(spark.storage.memoryFraction,1) .set(spark.core.connection.ack.wait.timeout,6000) .set(spark.akka.frameSize,50) There is some invalid configs here, spark.master.memory and spark.worker.memory are not valid. spark.storage.memoryFraction is too large, then you will have not memory for general use (such as shuffle). The Python jobs run in separated processes, so you should leave some memory for them in slaves. For example, if it has 8 CPUs, each Python process will need at least 8G (for 4G broadcast plus object overhead in Python), then you can use only 3 process in the same time, use spark.cores.max=2 OR spark.task.cpus=4. Also you can only set spark.executor.memory to 10G, leave 16G for Python. Also, you could specify spark.python.worker.memory = 8G to have better shuffle performance in Python. (it's not necessary) So, for large broadcast, maybe you should use Scala, which uses multiple threads, the broadcast will be shared by multiple tasks in same executor. Thanks, Tom On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote: Hi All, I am relatively new to spark and currently having troubles with broadcasting large variables ~500mb in size. Th e broadcast fails with an error shown below and the memory usage on the hosts also blow up. Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers)) and we are using Spark 1.1 (Python) via Cloudera CDH 5.2. We have managed to replicate the error using a test script shown below. I would be interested to know if anyone has seen this before with broadcasting or know of a fix. === ERROR == 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@fbc6caf 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@fbc6caf java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24
Re: Pyspark Error when broadcast numpy array
This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab bli...@cse.ust.hk wrote: In spark-1.0.2, I have come across an error when I try to broadcast a quite large numpy array(with 35M dimension). The error information except the java.lang.NegativeArraySizeException error and details is listed below. Moreover, when broadcast a relatively smaller numpy array(30M dimension), everything works fine. And 30M dimension numpy array takes 230M memory which, in my opinion, not very large. As far as I have surveyed, it seems related with py4j. However, I have no idea how to fix this. I would be appreciated if I can get some hint. py4j.protocol.Py4JError: An error occurred while calling o23.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) - And the test code is a follows: conf = SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051') conf.set('spark.executor.memory', '4000m') conf.set('spark.akka.timeout', '10') conf.set('spark.ui.port','8081') conf.set('spark.cores.max','150') #conf.set('spark.rdd.compress', 'True') conf.set('spark.default.parallelism', '300') #configure the spark environment sc = SparkContext(conf=conf, batchSize=1) vec = np.random.rand(3500) a = sc.broadcast(vec) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark Error when broadcast numpy array
Yes, your broadcast should be about 300M, much smaller than 2G, I didn't read your post carefully. The broadcast in Python had been improved much since 1.1, I think it will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1? Davies On Tue, Nov 11, 2014 at 8:37 PM, bliuab bli...@cse.ust.hk wrote: Dear Liu: Thank you very much for your help. I will update that patch. By the way, as I have succeed to broadcast an array of size(30M) the log said that such array takes around 230MB memory. As a result, I think the numpy array that leads to error is much smaller than 2G. On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List] [hidden email] wrote: This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote: In spark-1.0.2, I have come across an error when I try to broadcast a quite large numpy array(with 35M dimension). The error information except the java.lang.NegativeArraySizeException error and details is listed below. Moreover, when broadcast a relatively smaller numpy array(30M dimension), everything works fine. And 30M dimension numpy array takes 230M memory which, in my opinion, not very large. As far as I have surveyed, it seems related with py4j. However, I have no idea how to fix this. I would be appreciated if I can get some hint. py4j.protocol.Py4JError: An error occurred while calling o23.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) - And the test code is a follows: conf = SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051') conf.set('spark.executor.memory', '4000m') conf.set('spark.akka.timeout', '10') conf.set('spark.ui.port','8081') conf.set('spark.cores.max','150') #conf.set('spark.rdd.compress', 'True') conf.set('spark.default.parallelism', '300') #configure the spark environment sc = SparkContext(conf=conf, batchSize=1) vec = np.random.rand(3500) a = sc.broadcast(vec) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html To unsubscribe from Pyspark Error when broadcast numpy array, click here. NAML -- My Homepage: www.cse.ust.hk/~bliuab MPhil student in Hong Kong University of Science and Technology. Clear Water Bay, Kowloon, Hong Kong. Profile at LinkedIn. View this message in context: Re: Pyspark Error when broadcast numpy array Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: error when importing HiveContext
bin/pyspark will setup the PYTHONPATH of py4j for you, or you need to setup it by yourself. export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip On Fri, Nov 7, 2014 at 8:15 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I’m getting this error when importing hive context from pyspark.sql import HiveContext Traceback (most recent call last): File stdin, line 1, in module File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in module from pyspark.context import SparkContext File /path/spark-1.1.0/python/pyspark/context.py, line 30, in module from pyspark.java_gateway import launch_gateway File /path/spark-1.1.0/python/pyspark/java_gateway.py, line 26, in module from py4j.java_gateway import java_import, JavaGateway, GatewayClient ImportError: No module named py4j.java_gateway I cannot find py4j on my system. Where is it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark issue with sortByKey: IndexError: list index out of range
Could you tell how large is the data set? It will help us to debug this issue. On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote: I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it worked fine. On a larger data set I got this error: Traceback (most recent call last): File /home/skane/spark/examples/src/main/python/sort.py, line 30, in module .sortByKey(lambda x: x) File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey bounds.append(samples[index]) IndexError: list index out of range -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark issue with sortByKey: IndexError: list index out of range
It should be fixed in 1.1+. Could you have a script to reproduce it? On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote: I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it worked fine. On a larger data set I got this error: Traceback (most recent call last): File /home/skane/spark/examples/src/main/python/sort.py, line 30, in module .sortByKey(lambda x: x) File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey bounds.append(samples[index]) IndexError: list index out of range -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext._lock Error
What's the version of Python? 2.4? Davies On Wed, Nov 5, 2014 at 4:21 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I’m using this system Hadoop 1.0.4 Scala 2.9.3 Hive 0.9.0 With spark 1.1.0. When importing pyspark, I’m getting this error: from pyspark.sql import * Traceback (most recent call last): File stdin, line 1, in ? File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in ? from pyspark.context import SparkContext File /path/spark-1.1.0/python/pyspark/context.py, line 209 with SparkContext._lock: ^ SyntaxError: invalid syntax How do I fix it? Thank you, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet files are only 6-20MB in size?
Befire saveAsParquetFile(), you can call coalesce(N), then you will have N files, it will keep the order as before (repartition() will not). On Mon, Nov 3, 2014 at 1:16 AM, ag007 agre...@mac.com wrote: Thanks Akhil, Am I right in saying that the repartition will spread the data randomly so I loose chronological order? I really just want the csv -- parquet format in the same order it came in. If I set repartition with 1 will this not be random? cheers, Ag -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935p17941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn probably trying to load all the data to RAM
On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
Re: pySpark - convert log/txt files into sequenceFile
Without the second line, it's will much faster: infile = sc.wholeTextFiles(sys.argv[1]) infile.saveAsSequenceFile(sys.argv[2]) On Wed, Oct 29, 2014 at 3:31 AM, Csaba Ragany rag...@gmail.com wrote: Thank you Holden, it works! infile = sc.wholeTextFiles(sys.argv[1]) rdd = sc.parallelize(infile.collect()) rdd.saveAsSequenceFile(sys.argv[2]) Csaba 2014-10-28 17:56 GMT+01:00 Holden Karau hol...@pigscanfly.ca: Hi Csaba, It sounds like the API you are looking for is sc.wholeTextFiles :) Cheers, Holden :) On Tuesday, October 28, 2014, Csaba Ragany rag...@gmail.com wrote: Dear Spark Community, Is it possible to convert text files (.log or .txt files) into sequencefiles in Python? Using PySpark I can create a parallelized file with rdd=sc.parallelize([('key1', 1.0)]) and I can save it as a sequencefile with rdd.saveAsSequenceFile(). But how can I put the whole content of my text files into the 'value' of 'key1' ? I want a sequencefile where the keys are the filenames of the text files and the values are their content. Thank you for any help! Csaba -- Cell : 425-233-8271 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sampling in spark
_cumm = [p[0]] for i in range(1, len(p)): _cumm.append(_cumm[-1] + p[i]) index = set([bisect(_cumm, random.random()) for i in range(k)]) chosed_x = X.zipWithIndex().filter(lambda (v, i): i in index).map(lambda (v, i): v) chosed_y = [v for i, v in enumerate(y) if i in index] On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com wrote: Oops, the reference for the above code: http://stackoverflow.com/questions/26583462/selecting-corresponding-k-rows-from-matrix-and-vector/26583945#26583945 On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I have three rdds.. X,y and p X is matrix rdd (mXn), y is (mX1) dimension vector and p is (mX1) dimension probability vector. Now, I am trying to sample k rows from X and corresponding entries in y based on probability vector p. Here is the python implementation import random from bisect import bisect from operator import itemgetter def sample(population, k, prob): def cdf(population, k, prob): population = map(itemgetter(1), sorted(zip(prob, population))) cumm = [prob[0]] for i in range(1, len(prob)): cumm.append(_cumm[-1] + prob[i]) return [population[bisect(cumm, random.random())] for i in range(k)] return cdf(population, k, prob) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python code crashing on ReduceByKey if I return custom class object
This is known issue with PySpark, the class and objects of custom class in current script can not serialized by pickle between driver and worker You can workaround this by put 'testing' in a module, and sending this module to cluster by `sc.addPyFile` Davies On Sun, Oct 26, 2014 at 11:57 PM, sid siddardha.i...@gmail.com wrote: Hi , I am new to spark and I am trying to use pyspark. I am trying to find mean of 128 dimension vectors present in a file . Below is the code from cStringIO import StringIO class testing: def __str__(self): file_str = StringIO() file_str.write(str(self.first)) file_str.write( ) for n in self.vector: file_str.write(str(n)) file_str.write( ) file_str.write(self.filename) return file_str.getvalue() def __init__(self,txt=): self.vector = [0.0]*128 if len(txt)==0: return i=0 for n in txt.split(): if i==0 : self.key = float(n) i = i+1 continue if i128: self.vector[i-1]=float(n) i = i+1 continue break def addVec(self,r): a = testing() for n in xrange(0,128): a.vector[n] = self.vector[n] + r.vector[n] return a def InitializeAndReturnPair(string): vec = testing(string) return vec.key,vec from pyspark import SparkConf, SparkContext conf = (SparkConf() .setMaster(local) .setAppName(My app) .set(spark.executor.memory, 1g)) sc = SparkContext(conf = conf) inp = sc.textFile(input.txt) output = inp.map(lambda s: InitializeAndReturnPair(s)).cache() #Below line is throwing error print output.reduceByKey(lambda a,b : addVec(a,b)).collect() - I am not able to figure out where I am missing out , I tried changing the serializer but still getting similar error. Error Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/25 05:02:39 WARN Utils: Your hostname, Sid resolves to a loopback address: 127.0.1.1; using 192.168.0.15 instead (on interface wlan0) 14/10/25 05:02:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/10/25 05:02:39 INFO SecurityManager: Changing view acls to: sid, 14/10/25 05:02:39 INFO SecurityManager: Changing modify acls to: sid, 14/10/25 05:02:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sid, ); users with modify permissions: Set(sid, ) 14/10/25 05:02:40 INFO Slf4jLogger: Slf4jLogger started 14/10/25 05:02:40 INFO Remoting: Starting remoting 14/10/25 05:02:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO Utils: Successfully started service 'sparkDriver' on port 52124. 14/10/25 05:02:40 INFO SparkEnv: Registering MapOutputTracker 14/10/25 05:02:40 INFO SparkEnv: Registering BlockManagerMaster 14/10/25 05:02:40 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141025050240-56c5 14/10/25 05:02:40 INFO Utils: Successfully started service 'Connection manager for block manager' on port 51705. 14/10/25 05:02:40 INFO ConnectionManager: Bound socket to port 51705 with id = ConnectionManagerId(192.168.0.15,51705) 14/10/25 05:02:40 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 14/10/25 05:02:40 INFO BlockManagerMaster: Trying to register BlockManager 14/10/25 05:02:40 INFO BlockManagerMasterActor: Registering block manager 192.168.0.15:51705 with 265.4 MB RAM 14/10/25 05:02:40 INFO BlockManagerMaster: Registered BlockManager 14/10/25 05:02:40 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ce172226-4732-4633-aa45-1cbd45b7ec98 14/10/25 05:02:40 INFO HttpServer: Starting HTTP Server 14/10/25 05:02:40 INFO Utils: Successfully started service 'HTTP file server' on port 55111. 14/10/25 05:02:40 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/25 05:02:40 INFO SparkUI: Started SparkUI at http ://192.168.0.15:4040 14/10/25 05:02:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/10/25 05:02:41 INFO Utils: Copying /home/sid/Downloads/spark/pdsWork/AskHelp.py to /tmp/spark-80005f48-e41d-48ff-b249-bdf87130de5d/AskHelp.py 14/10/25 05:02:41 INFO SparkContext: Added file file:/home/sid/Downloads/spark/pdsWork/AskHelp.py at http ://192.168.0.15:55111/files/AskHelp.py with timestamp 1414227761547 14/10/25 05:02:41 INFO AkkaUtils: Connecting to
Re: spark is running extremely slow with larger data set, like 2G
On Thu, Oct 23, 2014 at 3:14 PM, xuhongnever xuhongne...@gmail.com wrote: my code is here: from pyspark import SparkConf, SparkContext def Undirect(edge): vector = edge.strip().split('\t') if(vector[0].isdigit()): return [(vector[0], vector[1])] return [] conf = SparkConf() conf.setMaster(spark://compute-0-14:7077) conf.setAppName(adjacencylist) conf.set(spark.executor.memory, 1g) Use more memory to gain better performance, or spark will keep spilling the data into disks, that is much slower. You also could give more memory to Python worker by set spark.python.worker.memory=1g or 2g sc = SparkContext(conf = conf) file = sc.textFile(file:///home/xzhang/data/soc-LiveJournal1.txt) records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b: a + \t + b ) a + \t + b will be very slow, if the number of values is large, groupByKey() will be better than it. #print(records.count()) #records = records.sortByKey() records = records.map(lambda line: line[0] + \t + line[1]) records.saveAsTextFile(file:///home/xzhang/data/result) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark is running extremely slow with larger data set, like 2G
On Fri, Oct 24, 2014 at 1:37 PM, xuhongnever xuhongne...@gmail.com wrote: Thank you very much. Changing to groupByKey works, it runs much more faster. By the way, could you give me some explanation of the following configurations, after reading the official explanation, i'm still confused, what's the relationship between them? is there any memory overlap between them? *spark.python.worker.memory spark.executor.memory spark.driver.memory* spark.driver.memory is used for JVM together with you local python scripts (called driver), spark.executor.memory is used for JVM in spark cluster (called slave or executor), In local mode, driver and executor share the same JVM, so spark.driver.memory is used. spark.python.worker.memory is used for Python worker in executor. Because of GIL, pyspark use multiple python process in the executor, one for each task. spark.python.worker.memory will tell the python worker to when to spill the data into disk. It's not hard limit, so the memory used in python worker maybe is little higher than it. If you have enough memory in executor, increase spark.python.worker.memory will let python worker to use more memory during shuffle (like groupBy()), which will increase the performance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17231.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies On Wed, Oct 22, 2014 at 7:51 AM, Marius Soutier mps@gmail.com wrote: It’s an AWS cluster that is rather small at the moment, 4 worker nodes @ 28 GB RAM and 4 cores, but fast enough for the currently 40 Gigs a day. Data is on HDFS in EBS volumes. Each file is a Gzip-compress collection of JSON objects, each one between 115-120 MB to be near the HDFS block size. One core per worker is permanently used by a job that allows SQL queries over Parquet files. On 22.10.2014, at 16:18, Arian Pasquali ar...@arianpasquali.com wrote: Interesting thread Marius, Btw, I'm curious about your cluster size. How small it is in terms of ram and cores. Arian 2014-10-22 13:17 GMT+01:00 Nicholas Chammas nicholas.cham...@gmail.com: Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes affect things at all? PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote: We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
Sorry, there is not, you can try clone from github and build it from scratch, see [1] [1] https://github.com/apache/spark Davies On Wed, Oct 22, 2014 at 2:31 PM, Marius Soutier mps@gmail.com wrote: Can’t install that on our cluster, but I can try locally. Is there a pre-built binary available? On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote: In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to aggregate data in Apach Spark
You also could use Spark SQL: from pyspark.sql import Row, SQLContext row = Row('id', 'C1', 'C2', 'C3') # convert each data = sc.textFile(test.csv).map(lambda line: line.split(',')) sqlContext = SQLContext(sc) rows = data.map(lambda r: row(*r)) sqlContext.inferSchema(rows).registerTempTable(data) result = sqlContext.sql(select id, C1, C2, sum(C3) from data group by id, C1, C2) # is SchemaRDD On Mon, Oct 20, 2014 at 2:52 AM, Gen gen.tan...@gmail.com wrote: Hi, I will write the code in python {code:title=test.py} data = sc.textFile(...).map(...) ## Please make sure that the rdd is like[[id, c1, c2, c3], [id, c1, c2, c3],...] keypair = data.map(lambda l: ((l[0],l[1],l[2]), float(l[3]))) keypair = keypair.reduceByKey(add) out = keypair.map(lambda l: list(l[0]) + [l[1]]) {code} Kalyan wrote I have a distribute system on 3 nodes and my dataset is distributed among those nodes. for example, I have a test.csv file which is exist on all 3 nodes and it contains 4 columns of **row | id, C1, C2, C3 -- row1 | A1 , c1 , c2 ,2 row2 | A1 , c1 , c2 ,1 row3 | A1 , c11, c2 ,1 row4 | A2 , c1 , c2 ,1 row5 | A2 , c1 , c2 ,1 row6 | A2 , c11, c2 ,1 row7 | A2 , c11, c21,1 row8 | A3 , c1 , c2 ,1 row9 | A3 , c1 , c2 ,2 row10 | A4 , c1 , c2 ,1 I need help, how to aggregate data set by id, c1,c2,c3 columns and output like this **row | id, C1, C2, C3 -- row1 | A1 , c1 , c2 ,3 row2 | A1 , c11, c2 ,1 row3 | A2 , c1 , c2 ,2 row4 | A2 , c11, c2 ,1 row5 | A2 , c11, c21,1 row6 | A3 , c1 , c2 ,3 row7 | A4 , c1 , c2 ,1 Thanks Kalyan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-aggregate-data-in-Apach-Spark-tp16764p16803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to disable input split
You can call coalesce() to merge the small splits into bigger ones. Davies On Fri, Oct 17, 2014 at 5:35 PM, Larry Liu larryli...@gmail.com wrote: Is it possible to disable input split if input is already small? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark speed performance
How many CPUs on the slave? Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark joins fail - please help
Hey Russell, join() can only work with RDD of pairs (key, value), such as rdd1: (k, v1) rdd2: (k, v2) rdd1.join(rdd2) will be (k1, v1, v2) Spark SQL will be more useful for you, see http://spark.apache.org/docs/1.1.0/sql-programming-guide.html Davies On Fri, Oct 17, 2014 at 5:01 PM, Russell Jurney russell.jur...@gmail.com wrote: https://gist.github.com/rjurney/fd5c0110fe7eb686afc9 Any way I try to join my data fails. I can't figure out what I'm doing wrong. -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome. com ᐧ
Re: PySpark Error on Windows with sc.wholeTextFiles
It's a bug, could you file a JIRA for this? Thanks! Davies On Thu, Oct 16, 2014 at 8:28 AM, Griffiths, Michael (NYC-RPM) michael.griffi...@reprisemedia.com wrote: Hi, I’m running into an error on Windows (x64, 8.1) running Spark 1.1.0 (pre-builet for Hadoop 2.4: spark-1.1.0-bin-hadoop2.4.tgz) with Java SE Version 8 Update 20 (build 1.8.0_20-b26); just getting started with Spark. When running sc.wholeTextFiles() on a directory, I can run the command but not do anything with the resulting RDD – specifically, I get an error in py4j.protocol.Py4JJavaError; the error is unspecified, though the location is included. I’ve attached the traceback below. In this situation, I’m trying to load all files from a folder on the local filesystem, located at D:\testdata. The folder contains one file, which can be loaded successfully with sc.textFile(“d:/testdata/filename”) – no problems at all – so I do not believe the file is throwing the error. Is there any advice on what I should look at further to isolate or fix the error? Am I doing something obviously wrong? Thanks, Michael Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.7 (default, Jun 11 2014 10:40:02) SparkContext available as sc. file = sc.textFile(d:/testdata/cbcc5b470ec06f212990c68c8f76e887b884) file.count() 732 file.first() u'!DOCTYPE html' data = sc.wholeTextFiles('d:/testdata') data.first() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 1167, in first return self.take(1)[0] File D:\spark\python\pyspark\rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line 538, in __call__ File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o21.partitions. : java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:445) at org.apache.hadoop.util.Shell.run(Shell.java:418) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) at org.apache.hadoop.util.Shell.execCommand(Shell.java:739) at org.apache.hadoop.util.Shell.execCommand(Shell.java:722) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:559) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1697) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1679) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263) at org.apache.spark.input.WholeTextFileInputFormat.setMaxSplitSize(WholeTextFileInputFormat.scala:54) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaPairRDD.partitions(JavaPairRDD.scala:44) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) data.count() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 847, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File
Re: ALS implicit error pyspark
It seems a bug, Could you create a JIRA for it? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at
Re: ALS implicit error pyspark
On Thu, Oct 16, 2014 at 9:53 AM, Gen gen.tan...@gmail.com wrote: Hi, I am trying to use ALS.trainImplicit method in the pyspark.mllib.recommendation. However it didn't work. So I tried use the example in the python API documentation such as: /r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / It didn't work neither. After searching in google, I found that there are only two overloads for ALS.trainImplicit in the scala script. So I tried /model = ALS.trainImplicit(ratings, 1, 1)/, it worked. But if I set the iterations other than 1, /model = ALS.trainImplicit(ratings, 1, 2)/ or /model = ALS.trainImplicit(ratings, 4, 2)/ for example, it generated error. The information is as follows: The Python API has default values for all other arguments, so you should call with only rank=1 (no default iterations in Scala). I'm curious that how can you meet this problem? count at ALS.scala:314 Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, most recent failure: Lost task 6.3 in stage 189.0 (TID 626, ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: It is really strange, because count at ALS.scala:314 is already out the loop of iterations. Any idea? Thanks a lot for advance. FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the cases. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS implicit error pyspark
Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at
Re: ALS implicit error pyspark
I can run the following code against Spark 1.1 sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) Davies On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu dav...@databricks.com wrote: Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173
Re: pyspark - extract 1 field from string
rdd.map(lambda line: int(line.split(',')[3])) On Tue, Oct 14, 2014 at 6:58 PM, Chop thomrog...@att.net wrote: I'm stumped with how to take 1 RDD that has lines like: 4,01012009,00:00,1289,4 5,01012009,00:00,1326,4 6,01012009,00:00,1497,7 and produce a new RDD with just the 4th field from each line (1289, 1326, 1497) I don't want to apply a conditional, I just want to grab that one field from each line in the existing RDD TIA -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-extract-1-field-from-string-tp16456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
Created JIRA for this: https://issues.apache.org/jira/browse/SPARK-3915 On Sat, Oct 11, 2014 at 12:40 PM, Evan Samanas evan.sama...@gmail.com wrote: It's true that it is an implementation detail, but it's a very important one to document because it has the possibility of changing results depending on when I use take or collect. The issue I was running in to was when the executor had a different operating system than the driver, and I was using 'pipe' with a binary I compiled myself. I needed to make sure I used the binary compiled for the operating system I expect it to run on. So in cases where I was only interested in the first value, my code was breaking horribly on 1.0.2, but working fine on 1.1. My only suggestion would be to backport 'spark.localExecution.enabled' to the 1.0 line. Thanks for all your help! Evan On Fri, Oct 10, 2014 at 10:40 PM, Davies Liu dav...@databricks.com wrote: This is some kind of implementation details, so not documented :-( If you think this is a blocker for you, you could create a JIRA, maybe it's could be fixed in 1.0.3+. Davies On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote: Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with using combineByKey
Maybe this version is easier to use: plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) = (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Is there any better source through which I can learn more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote: Oh duh, sorry. The initialization should of course be (v) = (if (v 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently. On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello Sean, Thank you, but changing from v to 1 doesn't help me either. I am trying to count the number of non-zero values using the first accumulator. val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0), (SFO,0), (SFO,9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) = (1, 1), (acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) = (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com wrote: You have a typo in your code at var acc:, and the map from opPart1 to opPart2 looks like a no-op, but those aren't the problem I think. It sounds like you intend the first element of each pair to be a count of nonzero values, but you initialize the first element of the pair to v, not 1, in v = (v,1). Try v = (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7), (SFO,0), (SFO,1), (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9), (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0))) val opPart1 = input.combineByKey( (v) = (v, 1), (var acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) = (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends ReducerText,IntWritable,Text,FloatWritable { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, IterableIntWritable values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException Error in task deserialization
Maybe, TorrentBroadcast is more complicated than HttpBroadcast, could you tell us how to reproduce this issue? That will help us a lot to improve TorrentBroadcast. Thanks! On Fri, Oct 10, 2014 at 8:46 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I haven't seen this at all since switching to HttpBroadcast. It seems TorrentBroadcast might have some issues? On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I don't think that I saw any other error message. This is all I saw. I'm currently experimenting to see if this can be alleviated by using HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
This is some kind of implementation details, so not documented :-( If you think this is a blocker for you, you could create a JIRA, maybe it's could be fixed in 1.0.3+. Davies On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote: Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GroupBy Key and then sort values with the group
There is a new API called repartitionAndSortWithinPartitions() in master, it may help in this case, then you should do the `groupBy()` by yourself. On Wed, Oct 8, 2014 at 4:03 PM, chinchu chinchu@gmail.com wrote: Sean, I am having a similar issue, but I have a lot of data for a group I cannot materialize the iterable into a List or Seq in memory. [I tried it runs into OOM]. is there any other way to do this ? I also tried a secondary-sort, with the key having the group::time, but the problem with that is the same group-name ends up in multiple partitions I am having to run sortByKey with one partition - sortByKey(true, 1) which shuffles a lot of data.. Thanks, -C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException Error in task deserialization
This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException Error in task deserialization
Could you provide a script to reproduce this problem? Thanks! On Wed, Oct 8, 2014 at 9:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: This is also happening to me on a regular basis, when the job is large with relatively large serialized objects used in each RDD lineage. A bad thing about this is that this exception always stops the whole job. On Fri, Sep 26, 2014 at 11:17 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: FWIW I suspect that each count operation is an opportunity for you to trigger the bug, and each filter operation increases the likelihood of setting up the bug. I normally don't come across this error until my job has been running for an hour or two and had a chance to build up longer lineages for some RDDs. It sounds like your data is a bit smaller and it's more feasible for you to build up longer lineages more quickly. If you can reduce your number of filter operations (for example by combining some into a single function) that may help. It may also help to introduce persistence or checkpointing at intermediate stages so that the length of the lineages that have to get replayed isn't as long. On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote: No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parsing one big multiple line .xml loaded in RDD using Python
Maybe sc.wholeTextFile() is what you want, you can get the whole text and parse it by yourself. On Tue, Oct 7, 2014 at 1:06 AM, jan.zi...@centrum.cz wrote: Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py. Unfortunately this method does not work, because RDD.flatMap() process the RDD line by line as strings. Does anyone has some suggestion of how to possibly parse the wikipedia like .xml loaded in RDD using Python? Thank you in advance for any suggestions, advices or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077... 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141006133418-0046 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: app-20141006133418-0046/0 on worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 (ip-172-31-30-40.ec2.internal:49979) with 1 cores 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979 with 1 cores, 512.0 MB RAM 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: app-20141006133418-0046/0 is now RUNNING 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852] with ID 0
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
On Mon, Oct 6, 2014 at 1:08 PM, jan.zi...@centrum.cz wrote: Hi, Thank you for your advice. It really might work, but to specify my problem a bit more, think of my data more like one generated item is one parsed wikipedia page. I am getting this generator from the parser and I don't want to save it to the storage, but directly apply parallelize and create RDD, based on your advice I'm now thinking that something like batching and creating several RDDs and then applying union on them might possibly be the way to go. Originaly I was thinking of calling the parsing function in flatMap on the RDD loaded from the xml file, but then I unfortunately had this problem http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim so now I am trying to parse the xml on the master node an directly put it to the RDD. gensim.corpora.wikicorpus.extract_pages should be call in flatMap() for better performance (or it will be the bottleneck in master). Because the function used in flatMap() is executed in worker, so you should make sure that the files (accessed by extract_pages) should be accessable by workers, putting them in a DFS or NFS in cluster mode. In local mode, may be you should use absolute path for the files. Davies __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 06.10.2014 18:09 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI
Re: Trouble getting filtering on field correct
rdd.filter(lambda line: int(line.split(' ')[8]) = 125) On Fri, Oct 3, 2014 at 8:16 PM, Chop thomrog...@att.net wrote: Given an RDD with multiple lines of the form: u'207.86.121.131 207.86.121.131 2012-11-27 13:02:17 titlestring 622592 27 184464' (fields are separated by a ) What pyspark function/commands do I use to filter out those lines where line[8] = x? (i.e line[8] = 125) when I use line.split( ) I get an RDD of each field in each line. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-getting-filtering-on-field-correct-tp15728.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IPython Notebook Debug Spam
On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson rick.richard...@gmail.com wrote: I am experiencing significant logging spam when running PySpark in IPython Notebok Exhibit A: http://i.imgur.com/BDP0R2U.png I have taken into consideration advice from: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-all-spark-logging-td1960.html also http://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-pyspark I have only one log4j.properties it is in /opt/spark-1.1.0/conf Just before I launch IPython Notebook with a pyspark profile, I add the dir and the properties file directly to CLASSPATH and SPARK_CLASSPATH env vars (as you can also see from the png) I still haven't been able to make any change which disables this infernal debug output. Any ideas (WAGs, Solutions, commiserating) would be greatly appreciated. --- My log4j.properties: log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n You should change log4j.rootCategory to WARN, console # Change this to set Spark log level log4j.logger.org.apache.spark=INFO # Silence akka remoting log4j.logger.Remoting=WARN # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IPython Notebook Debug Spam
How do you setup IPython to access pyspark in notebook? I did as following, it worked for me: $ export SPARK_HOME=/opt/spark-1.1.0/ $ export PYTHONPATH=/opt/spark-1.1.0/python:/opt/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip $ ipython notebook All the logging will go into console (not in notebook), If you want to reduce the logging in console, you should change /opt/spark-1.1.0/conf/log4j.properties log4j.rootCategory=WARN, console og4j.logger.org.apache.spark=WARN On Wed, Oct 1, 2014 at 11:49 AM, Rick Richardson rick.richard...@gmail.com wrote: Thanks for your reply. Unfortunately changing the log4j.properties within SPARK_HOME/conf has no effect on pyspark for me. When I change it in the master or workers the log changes have the desired effect, but pyspark seems to ignore them. I have changed the levels to WARN, changed the appender to rolling file, or removed it entirely, all with the same results. On Wed, Oct 1, 2014 at 1:49 PM, Davies Liu dav...@databricks.com wrote: On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson rick.richard...@gmail.com wrote: I am experiencing significant logging spam when running PySpark in IPython Notebok Exhibit A: http://i.imgur.com/BDP0R2U.png I have taken into consideration advice from: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-all-spark-logging-td1960.html also http://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-pyspark I have only one log4j.properties it is in /opt/spark-1.1.0/conf Just before I launch IPython Notebook with a pyspark profile, I add the dir and the properties file directly to CLASSPATH and SPARK_CLASSPATH env vars (as you can also see from the png) I still haven't been able to make any change which disables this infernal debug output. Any ideas (WAGs, Solutions, commiserating) would be greatly appreciated. --- My log4j.properties: log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n You should change log4j.rootCategory to WARN, console # Change this to set Spark log level log4j.logger.org.apache.spark=INFO # Silence akka remoting log4j.logger.Remoting=WARN # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN -- “Science is the great antidote to the poison of enthusiasm and superstition.” -- Adam Smith - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NegativeArraySizeException in pyspark
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), What is the error? Could you file a JIRA for it? that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative
Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]’} - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Null values in pyspark Row
Could create a JIRA and add test cases for it? Thanks! Davies On Wed, Sep 24, 2014 at 11:56 AM, jamborta jambo...@gmail.com wrote: Hi all, I have just updated to spark 1.1.0. The new row representation of the data in spark SQL is very handy. I have noticed that it does not set None for NULL values coming from Hive if the column was string type - seems it works with other types. Is that something that will be implemented? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Null-values-in-pyspark-Row-tp15065.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: access javaobject in rdd map
You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: access javaobject in rdd map
Right now, there is no way to access JVM in Python worker, in order to make this happen, we need to do: 1. setup py4j in Python worker 2. serialize the JVM objects and transfer to executors 3. link the JVM objects and py4j together to get an interface Before these happens, maybe you could try to setup a service for the model (such as RESTful service), access it map via RPC. On Tue, Sep 23, 2014 at 9:48 AM, Tamas Jambor jambo...@gmail.com wrote: Hi Davies, Thanks for the reply. I saw that you guys do that way in the code. Is there no other way? I have implemented all the predict functions in scala, so I prefer not to reimplement the whole thing in python. thanks, On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu dav...@databricks.com wrote: You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NegativeArraySizeException in pyspark
Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home/bmiller1/pipeline/driver.py, line 203, in main bl.write_results(iteration_out_dir) File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in write_results fig, accuracy = _get_results(self.prediction_rdd) File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in _get_results predictions = np.array(prediction_rdd.collect()) File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 723, in collect bytesInJava = self._jrdd.collect().iterator() File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 2026, in _jrdd broadcast_vars, self.ctx._javaAccumulator) File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred
Re: Time difference between Python and Scala
I think it's normal. On Fri, Sep 19, 2014 at 12:07 AM, Luis Guerra luispelay...@gmail.com wrote: Hello everyone, What should be the normal time difference between Scala and Python using Spark? I mean running the same program in the same cluster environment. In my case I am using numpy array structures for the Python code and vectors for the Scala code, both for handling my data. The time difference I have so far is Scala being around 6x times faster than Python...is it normal? Best regards - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: schema for schema
Thanks for reporting this, it will be fixed by https://github.com/apache/spark/pull/2448 On Thu, Sep 18, 2014 at 12:32 PM, Michael Armbrust mich...@databricks.com wrote: This looks like a bug, we are investigating. On Thu, Sep 18, 2014 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I have a SchemaRDD which I've gotten from a parquetFile. Did some transforms on it and now want to save it back out as parquet again. Getting a SchemaRDD proves challenging because some of my fields can be null/None and SQLContext.inferSchema abjects those. So, I decided to use the schema on the original RDD with SQLContext.applySchema. This works, but only if I add a map function to turn my Row objects into a list. (pyspark) applied = sq.applySchema(transformed_rows.map(lambda r: list(r)), original_parquet_file.schema()) This seems a bit kludgy. Is there a better way? Should there be? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD
PipelinedRDD is an RDD generated by Python mapper/reducer, such as rdd.map(func) will be PipelinedRDD. PipelinedRDD is an subclass of RDD, so it should have all the APIs which RDD has. sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count() 10 I'm wondering that how can you trigger this error? Davies On Tue, Sep 16, 2014 at 10:03 PM, edmond_huo huoxiang5...@gmail.com wrote: Hi, I am a freshman about spark. I tried to run a job like wordcount example in python. But when I tried to get the top 10 popular words in the file, I got the message:AttributeError: 'PipelinedRDD' object has no attribute 'sortByKey'. So my question is what is the difference between PipelinedRDD and RDD? and if I want to sort the data in PipelinedRDD, how can I do it? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark on yarn - lost executor
Maybe the Python worker use too much memory during groupByKey(), groupByKey() with larger numPartitions can help. Also, can you upgrade your cluster to 1.1? It can spilling the data into disks if the memory can not hold all the data during groupByKey(). Also, If there is hot key with dozens of millions of values, the PR [1] can help it, it actually helped someone with large datasets (3T). Davies [1] https://github.com/apache/spark/pull/1977 On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Sure, I'll post to the mail list. groupByKey(self, numPartitions=None) source code Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem? Thanks Oleg. http://spark.apache.org/docs/1.0.2/api/python/frames.html On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions. Eric Friedman On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Can hou please explain how to configure partitions? Thanks Oleg On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com wrote: Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low. Eric Friedman On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote: This is very good question :-). Here is my code: sc = SparkContext(appName=CAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc)) result.saveAsTextFile(sys.argv[2]) Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices? Thanks Oleg. On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com wrote: How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 QUESTION: how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find
Re: Number of partitions when saving (pyspark)
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra luispelay...@gmail.com wrote: Hi everyone, Is it possible to fix the number of tasks related to a saveAsTextFile in Pyspark? I am loading several files from HDFS, fixing the number of partitions to X (let's say 40 for instance). Then some transformations, like joins and filters are carried out. The weird thing here is that the number of tasks involved in these transformations are 80, i.e. the double of the fixed number of partitions. However, when the saveAsTextFile action is carried out, there are only 4 tasks to do this (and I have not been able to increase that number). My problem here is that those 4 tasks make rapidly increase the used memory and take too long to finish. I am launching my process from windows to a cluster in ubuntu, with 13 computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2. The saveAsTextFile() is an mapper RDD, so the number of partitions of it is determined by previous RDD. In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs on driver (locally) as the default partitions, so it's 4. You need to change it to 40 or 80 in this case. BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of partitions of previous RDD as the default value. Davies Any clue with this? Thanks in advance - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to ship external Python libraries in PYSPARK
Yes, sc.addFile() is what you want: | addFile(self, path) | Add a file to be downloaded with this Spark job on every node. | The C{path} passed can be either a local file, a file in HDFS | (or other Hadoop-supported filesystems), or an HTTP, HTTPS or | FTP URI. | | To access the file in Spark jobs, use | L{SparkFiles.get(fileName)pyspark.files.SparkFiles.get} with the | filename to find its download location. | | from pyspark import SparkFiles | path = os.path.join(tempdir, test.txt) | with open(path, w) as testFile: | ...testFile.write(100) | sc.addFile(path) | def func(iterator): | ...with open(SparkFiles.get(test.txt)) as testFile: | ...fileVal = int(testFile.readline()) | ...return [x * fileVal for x in iterator] | sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() | [100, 200, 300, 400] On Tue, Sep 16, 2014 at 7:02 PM, daijia jia_...@intsig.com wrote: Is there some way to ship textfile just like ship python libraries? Thanks in advance Daijia -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-ship-external-Python-libraries-in-PYSPARK-tp14074p14412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast error
(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) and 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@hostname:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@ hostname:7077] ?? Any suggestions?? On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu dav...@databricks.com wrote: Hey Chengi, What's the version of Spark you are using? It have big improvements about broadcast in 1.1, could you try it? On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu chengi.liu...@gmail.com wrote: Any suggestions.. I am really blocked on this one On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu chengi.liu...@gmail.com wrote: And when I use sparksubmit script, I get the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o26.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) My spark submit code is conf = SparkConf().set(spark.executor.memory, 32G).set(spark.akka.frameSize, 1000) sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set Sum of Squared Error = + str(WSSSE) Which is executed as following: spark-submit --master $SPARKURL clustering_example.py --executor-memory 32G --driver-memory 60G On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu chengi.liu...@gmail.com wrote: How? Example please.. Also, if I am running this in pyspark shell.. how do i configure spark.akka.frameSize ?? On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When the data size is huge, you better of use the torrentBroadcastFactory. Thanks Best Regards On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu chengi.liu...@gmail.com wrote: Specifically the error I see when I try to operate on rdd created by sc.parallelize method : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12:12 was 12062263 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am trying to create an rdd out of large matrix sc.parallelize suggest to use broadcast But when I do sc.broadcast(data) I get this error: Traceback (most recent call last): File stdin, line 1, in module File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 370, in broadcast pickled = pickleSer.dumps(value) File /usr/common/usg/spark/1.0.2/python/pyspark/serializers.py, line 279, in dumps def dumps(self, obj): return cPickle.dumps(obj, 2) SystemError: error return without exception set
Re: PathFilter for newAPIHadoopFile?
In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Write 1 RDD to multiple output paths in one go
Maybe we should provide an API like saveTextFilesByKey(path), could you create an JIRA for it ? There is one in DPark [1] actually. [1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309 On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any tips from anybody on how to do this in PySpark? (Or regular Spark, for that matter.) On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Howdy doody Spark Users, I’d like to somehow write out a single RDD to multiple paths in one go. Here’s an example. I have an RDD of (key, value) pairs like this: a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files or whatever. So my output would be something like: /path/prefix/n [/part-1, /part-2, etc] /path/prefix/b [/part-1, /part-2, etc] /path/prefix/f [/part-1, /part-2, etc] How would you do that? I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along with the MultipleTextOutputFormat output format class, but I’m not sure how. By the way, there is a very similar question to this here on Stack Overflow. Nick View this message in context: Write 1 RDD to multiple output paths in one go Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
There is one way by do it in bash: hadoop fs -ls , maybe you could end up with a bash scripts to do the things. On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That's a good idea and one I had considered too. Unfortunately I'm not aware of an API in PySpark for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
Or maybe you could give this one a try: https://labs.spotify.com/2013/05/07/snakebite/ On Mon, Sep 15, 2014 at 2:51 PM, Davies Liu dav...@databricks.com wrote: There is one way by do it in bash: hadoop fs -ls , maybe you could end up with a bash scripts to do the things. On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That's a good idea and one I had considered too. Unfortunately I'm not aware of an API in PySpark for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast error
Hey Chengi, What's the version of Spark you are using? It have big improvements about broadcast in 1.1, could you try it? On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu chengi.liu...@gmail.com wrote: Any suggestions.. I am really blocked on this one On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu chengi.liu...@gmail.com wrote: And when I use sparksubmit script, I get the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o26.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) My spark submit code is conf = SparkConf().set(spark.executor.memory, 32G).set(spark.akka.frameSize, 1000) sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set Sum of Squared Error = + str(WSSSE) Which is executed as following: spark-submit --master $SPARKURL clustering_example.py --executor-memory 32G --driver-memory 60G On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu chengi.liu...@gmail.com wrote: How? Example please.. Also, if I am running this in pyspark shell.. how do i configure spark.akka.frameSize ?? On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When the data size is huge, you better of use the torrentBroadcastFactory. Thanks Best Regards On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu chengi.liu...@gmail.com wrote: Specifically the error I see when I try to operate on rdd created by sc.parallelize method : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12:12 was 12062263 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am trying to create an rdd out of large matrix sc.parallelize suggest to use broadcast But when I do sc.broadcast(data) I get this error: Traceback (most recent call last): File stdin, line 1, in module File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 370, in broadcast pickled = pickleSer.dumps(value) File /usr/common/usg/spark/1.0.2/python/pyspark/serializers.py, line 279, in dumps def dumps(self, obj): return cPickle.dumps(obj, 2) SystemError: error return without exception set Help? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: coalesce on SchemaRDD in pyspark
This is a bug, I had create an issue to track this: https://issues.apache.org/jira/browse/SPARK-3500 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369 Before next bugfix release, you can workaround this by: srdd = sqlCtx.jsonRDD(rdd) srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx) On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm having some trouble with the coalesce and repartition functions for SchemaRDD objects in pyspark. When I run: sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}', '{foo:baz}'])).coalesce(1) I get this error: Py4JError: An error occurred while calling o94.coalesce. Trace: py4j.Py4JException: Method coalesce([class java.lang.Integer, class java.lang.Boolean]) does not exist For context, I have a dataset stored in a parquet file, and I'm using SQLContext to make several queries against the data. I then register the results of these as queries new tables in the SQLContext. Unfortunately each new table has the same number of partitions as the original (despite being much smaller). Hence my interest in coalesce and repartition. Has anybody else encountered this bug? Is there an alternate workflow I should consider? I am running the 1.1.0 binaries released today. best, -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: coalesce on SchemaRDD in pyspark
On Fri, Sep 12, 2014 at 8:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the quick fix. I'm sorry to send out a bug report on release day - 1.1.0 really is a great release. I've been running the 1.1 branch for a while and there's definitely lots of good stuff. For the workaround, I think you may have meant: srdd2 = SchemaRDD(srdd._jschema_rdd.coalesce(N, False, None), sqlCtx) Yes, thanks for the correction. Note: _schema_rdd - _jschema_rdd false - False That workaround seems to work fine (in that I've observed the correct number of partitions in the web-ui, although haven't tested it any beyond that). Thanks! -Brad On Thu, Sep 11, 2014 at 11:30 PM, Davies Liu dav...@databricks.com wrote: This is a bug, I had create an issue to track this: https://issues.apache.org/jira/browse/SPARK-3500 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369 Before next bugfix release, you can workaround this by: srdd = sqlCtx.jsonRDD(rdd) srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx) On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm having some trouble with the coalesce and repartition functions for SchemaRDD objects in pyspark. When I run: sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}', '{foo:baz}'])).coalesce(1) I get this error: Py4JError: An error occurred while calling o94.coalesce. Trace: py4j.Py4JException: Method coalesce([class java.lang.Integer, class java.lang.Boolean]) does not exist For context, I have a dataset stored in a parquet file, and I'm using SQLContext to make several queries against the data. I then register the results of these as queries new tables in the SQLContext. Unfortunately each new table has the same number of partitions as the original (despite being much smaller). Hence my interest in coalesce and repartition. Has anybody else encountered this bug? Is there an alternate workflow I should consider? I am running the 1.1.0 binaries released today. best, -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to ship external Python libraries in PYSPARK
By SparkContext.addPyFile(xx.zip), the xx.zip will be copies to all the workers and stored in temporary directory, the path to xx.zip will be in the sys.path on worker machines, so you can import xx in your jobs, it does not need to be installed on worker machines. PS: the package or module should be in the top level in xx.zip, or it cannot be imported. such as : daviesliu@dm:~/work/tmp$ zipinfo textblob.zip Archive: textblob.zip 3245946 bytes 517 files drwxr-xr-x 3.0 unx0 bx stor 12-Sep-14 10:10 textblob/ -rw-r--r-- 3.0 unx 203 tx defN 12-Sep-14 10:10 textblob/__init__.py -rw-r--r-- 3.0 unx 563 bx defN 12-Sep-14 10:10 textblob/__init__.pyc -rw-r--r-- 3.0 unx61510 tx defN 12-Sep-14 10:10 textblob/_text.py -rw-r--r-- 3.0 unx68316 bx defN 12-Sep-14 10:10 textblob/_text.pyc -rw-r--r-- 3.0 unx 2962 tx defN 12-Sep-14 10:10 textblob/base.py -rw-r--r-- 3.0 unx 5501 bx defN 12-Sep-14 10:10 textblob/base.pyc -rw-r--r-- 3.0 unx27621 tx defN 12-Sep-14 10:10 textblob/blob.py you can get this textblob.zip by: pip install textblob cd /xxx/xx/site-package/ zip -r path_to_store/textblob.zip textblob Davies On Fri, Sep 12, 2014 at 1:39 AM, yh18190 yh18...@gmail.com wrote: Hi all, I am currently working on pyspark for NLP processing etc.I am using TextBlob python library.Normally in a standalone mode it easy to install the external python libraries .In case of cluster mode I am facing problem to install these libraries on worker nodes remotely.I cannot access each and every worker machine to install these libs in python path.I tried to use Sparkcontext pyfiles option to ship .zip files..But the problem is these python packages needs to be get installed on worker machines.Could anyone let me know wat are different ways of doing it so that this lib-Textblob could be available in python path. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-ship-external-Python-libraries-in-PYSPARK-tp14074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting up jvm in pyspark from shell
The heap size of JVM can not been changed dynamically, so you need to config it before running pyspark. If you run it in local mode, you should config spark.driver.memory (in 1.1 or master). Or, you can use --driver-memory 2G (should work in 1.0+) On Wed, Sep 10, 2014 at 10:43 PM, Mohit Singh mohit1...@gmail.com wrote: Hi, I am using pyspark shell and am trying to create an rdd from numpy matrix rdd = sc.parallelize(matrix) I am getting the following error: JVMDUMP039I Processing dump event systhrow, detail java/lang/OutOfMemoryError at 2014/09/10 22:41:44 - please wait. JVMDUMP032I JVM requested Heap dump using '/global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd' in response to an event JVMDUMP010I Heap dump written to /global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd JVMDUMP032I JVM requested Java dump using '/global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt' in response to an event JVMDUMP010I Java dump written to /global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt JVMDUMP032I JVM requested Snap dump using '/global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc' in response to an event JVMDUMP010I Snap dump written to /global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc JVMDUMP013I Processed dump event systhrow, detail java/lang/OutOfMemoryError. Exception AttributeError: 'SparkContext' object has no attribute '_jsc' in bound method SparkContext.__del__ of pyspark.context.SparkContext object at 0x11f9450 ignored Traceback (most recent call last): File stdin, line 1, in module File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 271, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:618) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:804) I did try to setSystemProperty sc.setSystemProperty(spark.executor.memory, 20g) How do i increase jvm heap from the shell? -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
I think the mails to spark.incubator.apache.org will be forwarded to spark.apache.org. Here is the header of the first mail: from: redocpot julien19890...@gmail.com to: u...@spark.incubator.apache.org date: Mon, Sep 8, 2014 at 7:29 AM subject: groupBy gives non deterministic results mailing list: user.spark.apache.org Filter messages from this mailing list mailed-by: spark.apache.org I only subscribe spark.apache.org, and I do see all the mails from he. On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com wrote: | Do the two mailing lists share messages ? I don't think so. I didn't receive this message from the user list. I am not in databricks, so I can't answer your other questions. Maybe Davies Liu dav...@databricks.com can answer you? -- Ye Xianjin Sent with Sparrow On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote: Hi, Xianjin I checked user@spark.apache.org, and found my post there: http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser I am using nabble to send this mail, which indicates that the mail will be sent from my email address to the u...@spark.incubator.apache.org mailing list. Do the two mailing lists share messages ? Do we have a nabble interface for user@spark.apache.org mail list ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD memory questions
On Wed, Sep 10, 2014 at 1:05 AM, Boxian Dong box...@indoo.rs wrote: Thank you very much for your kindly help. I rise some another questions: - If the RDD is stored in serialized format, is that means that whenever the RDD is processed, it will be unpacked and packed again from and back to the JVM even they are located on the same machine? http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png In PySpark, Yes. But in Spark generally, no, you have several choice to cache RDD in Scala, serialized or not. - Can the RDD be partially unpacked from the serialized state? or when every a RDD is touched, it must be fully unpacked, and of course pack again afterword. The items in RDD are deserialized batch by batch, so if you call rdd.take(), only first small parts of items are deserialized. The cache of RDD are kept in JVM, you do not need to pack again after visiting them. - When a RDD is cached, is it saved in a unserialized format or serialized format? If it's saved in a unserialized format, is the partially reading of RDD from JVM to PYTHON runtime possible? For PySpark, they are all saved in serialized format. During a transformation of RDD, you can only see the current partition, you can not access other partitions or other RDD. The RDD always are read-only, so you can not modify them any time. (all the modification will be dropped.) Thank you very much -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
What's the type of the key? If the hash of key is different across slaves, then you could get this confusing results. We had met this similar results in Python, because of hash of None is different across machines. Davies On Mon, Sep 8, 2014 at 8:16 AM, redocpot julien19890...@gmail.com wrote: Update: Just test with HashPartitioner(8) and count on each partition: List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591*), (*6,658327*), (*7,658434*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657594)*, (6,658326), (*7,658434*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (*7,658435*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591)*, (6,658326), (7,658434)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591)*, (6,658326), (7,658435)) The result is not identical for each execution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
Which version of Spark are you using? This bug had been fixed in 0.9.2, 1.0.2 and 1.1, could you upgrade to one of these versions to verify it? Davies On Tue, Sep 9, 2014 at 7:03 AM, redocpot julien19890...@gmail.com wrote: Thank you for your replies. More details here: The prog is executed on local mode (single node). Default env params are used. The test code and the result are in this gist: https://gist.github.com/coderh/0147467f0b185462048c Here is 10 first lines of the data: 3 fields each row, the delimiter is ; 3801959;11775022;118 3801960;14543202;118 3801984;11781380;20 3801984;13255417;20 3802003;11777557;91 3802055;11781159;26 3802076;11782793;102 3802086;17881551;102 3802087;19064728;99 3802105;12760994;99 ... There are 27 partitions(small files). Total size is about 100 Mb. We find that this problem is highly probably caused by the bug SPARK-2043: https://issues.apache.org/jira/browse/SPARK-2043 Could someone give more details on this bug ? The pull request say: The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead. I don't understand why reading a key with the next hash code will cause it to miss some matches of the next key. If someone could show me some code to dig in, it's highly appreciated. =) Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD memory questions
On Tue, Sep 9, 2014 at 10:07 AM, Boxian Dong box...@indoo.rs wrote: I currently working on a machine learning project, which require the RDDs' content to be (mostly partially) updated during each iteration. Because the program will be converted directly from traditional python object-oriented code, the content of the RDD will be modified in the mapping function. To test the functionality and memory , I writed a testing program: class TestClass(object): def __init__(self): self.data = [] def setup(self): self.data = range(2) return self def addNumber(self, number): length = len(self.data) for i in range(length): self.data[i] += number return self def sumUp(self): totoal = 0 for n in self.data: totoal += n return totoal and Spark main: origData = [] for i in range(50): origData.append((i, TestClass())) # create the RDD and cache it rddData = sc.parallelize(origData).mapValues(lambda v : v.setup()) rddData.cache() # modifying the content of RDD in map function scD = rddData for i in range(100): scD = scD.mapValues(lambda v : v.addNumber(10)) scDSum = scD.map(lambda (k, v) : v.sumUp()) v = scDSum.reduce(lambda a, b: a + b) print -- after the transfermation, the value is , v scDSum = rddData .map(lambda (k, v) : v.sumUp()) v = scDSum.reduce(lambda a, b: a + b) print -- after the transformation, the cached value is , v - By judging the results, it seems to me that when the RDDs is cached, the directed modification doesn't affect the content - By the monitoring of the memory usage, it seems to me that the memory are not duplicated during each RDD (narrow dependence) transformation (or I am wrong) therefore, my question is: - how the cache works, does it make a copy of the data separately ? - How the memory is managed in the MAP function? (in narrow dependence) Are the entire RDDs first duplicated, modified and then assigned to the new RDDs, and afterward the old RDDs will be deleted from the memory. Or the new RDDs will reuse the same memory of the old RDDs, without the duplication/copy of the memory? I'm trying to answer some of your questions: The RDD is cached in JVM (after serialized by pickle). In Python, it reads the serialized data from socket then deserialized it into Python objects, call mapper or reducer on them, finally sending them back to JVM via socket. The Python process only hold a batch of objects, so the memory usage will be smaller than the whole partition. The cache in JVM is created after first iteration of them. So when you process them the second time (or even more), they will be read from cache in JVM. RDDs are read only, you can not modify them, each transformation will create new RDDs. During MAP function, the objects in RDDs are throwed away after accessing, any modification to them will be lost. - If the new generated RDDs directly use the memory of the old RDDs (in narrow dependence) , why the cached RDDs still reserve old content. Are the cached RDDs treated differently from uncached RDDs in memory management. There is no two RDDs sharing any memory, they are totally separated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn - how group by data properly
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need to group by key (first column) : key values 1 -- (2,3),(3,5) 2 -- (3,5),(3,4),(5,1) and I need to process (pass) values to the function f ( my custom function) outcome of function f() should be to hdfs with corresponding key: 1 -- f() outcome 2 -- f() outcome. My code is : def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) lines = sc.textFile(filename,1) counts = lines.map(doSplit).groupByKey() output = counts.collect() for (key, value) in output: print 'build model for key -' , key print value f(str(key) , value)) Questions: 1) lines.map(doSplit).groupByKey() - I didn't find the option to use groupByKey( f() ) to process grouped values? how can I process grouped keys by custom function? function f has some not trivial logic. The result of groupByKey() is still RDD with (key, ResultIterable(values)), so you can continue to call map() or mapValues() on it: lines.map(doSplit).groupByKey().map(f) But your `f` need two parameters, the map() will assume that `f` take one parameter, so you need to build a wrapper for `f`: lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) If the `f` only accept values as list, then you need to convert `vs` into list: result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs))) finally, you could save the `result` into HDFS: result.saveAsPickleFile(path, batch=1024) 2) Using output ( I really don't like this approach ) to pass to function looks like not scalable and executed only on one machine? What is the way using PySpark process grouped keys in distributed fashion. Multiprocessing and on different machine of the cluster. 3)In case of processing output how data can be stored on hdfs? Currently, it's not easy to access files in HDFS, you could do it by sc.parallelize(local_data).map(str).saveAsTextFile() Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL check if query is completed (pyspark)
The SQLContext.sql() will return an SchemaRDD, you need to call collect() to pull the data in. On Sat, Sep 6, 2014 at 6:02 AM, jamborta jambo...@gmail.com wrote: Hi, I am using Spark SQL to run some administrative queries and joins (e.g. create table, insert overwrite, etc), where the query does not return any data. I noticed if the query fails it prints some error message on the console, but does not actually throw an exception (this is spark 1.0.2). Is there any way to get these errors from the returned object? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-check-if-query-is-completed-pyspark-tp13630.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting the type of an RDD in spark AND pyspark
But you can not get what you expected in PySpark, because the RDD in Scala is serialized, so it will always be RDD[Array[Byte]], whatever the type of RDD in Python is. Davies On Sat, Sep 6, 2014 at 4:09 AM, Aaron Davidson ilike...@gmail.com wrote: Pretty easy to do in Scala: rdd.elementClassTag.runtimeClass You can access this method from Python as well by using the internal _jrdd. It would look something like this (warning, I have not tested it): rdd._jrdd.classTag().runtimeClass() (The method name is classTag for JavaRDDLike, and elementClassTag for Scala's RDD.) On Thu, Sep 4, 2014 at 1:32 PM, esamanas evan.sama...@gmail.com wrote: Hi, I'm new to spark and scala, so apologies if this is obvious. Every RDD appears to be typed, which I can see by seeing the output in the spark-shell when I execute 'take': scala val t = sc.parallelize(Array(1,2,3)) t: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at console:12 scala t.take(3) res4: Array[Int] = Array(1, 2, 3) scala val u = sc.parallelize(Array(1,Array(2,2,2,2,2),3)) u: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[3] at parallelize at console:12 scala u.take(3) res5: Array[Any] = Array(1, Array(2, 2, 2, 2, 2), 3) Array type stays the same even if only one type returned. scala u.take(1) res6: Array[Any] = Array(1) Is there some way to just get the name of the type of the entire RDD from some function call? Also, I would really like this same functionality in pyspark, so I'm wondering if that exists on that side, since clearly the underlying RDD is typed (I'd be fine with either the Scala or Python type name). Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-type-of-an-RDD-in-spark-AND-pyspark-tp13498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn a lot of python scripts project
Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn a lot of python scripts project
Here is a store about how shared storage simplify all the things: In Douban, we use Moose FS[1] instead of HDFS as the distributed file system, it's POSIX compatible and can be mounted just as NFS. We put all the data and tools and code in it, so we can access them easily on all the machines, just like local disks. You can modify them in anywhere, and get the modified one from anywhere. One example, you will want to know the fields in your compressed log files: $ bunzip2 -k path | head Then you will need to modify your code to deal with these fields in logs: $ vim myjob.py you have bunch of libraries or modules in the projects, but you will not need to worry about them when run distributed jobs, you just need to do: $ python myjob.py If something wrong, you could modify myjob.py and save some RDDs into disks, then check the results: $ head path_to_result_of_rdd maybe something wrong is your library, then fix them, and run again: $ python myjob.py dump the result as CSV file, then load them into MySQL mysql xxx path_of_the_result In a summary, a shared storage can help a lot in distributed environment, some simple solution (such as NFS) is natural to solve these problem. setup once, benefit forever. PS: I'm also a contributor of Moose FS, has a fork at github.com/davies/moosefs/ PPS: I'm sorry for my pool English, if the above sounds rude to you, Davies [1] http://moosefs.org/ On Fri, Sep 5, 2014 at 11:22 AM, Dimension Data, LLC. subscripti...@didata.us wrote: I'd have to agree with Marcelo and Andrew here... Favoring a simple Build-and-Run/Submit wrapper-script that leverages '--py-files file.zip' over adding another layer of complexity -- even if seemingly 'trivial' like NFS -- is probably a good approach (... b/c more technology is never is 'trivial' over time). =:). Less is more. On 09/05/2014 01:58 PM, Marcelo Vanzin wrote: On Fri, Sep 5, 2014 at 10:50 AM, Davies Liu dav...@databricks.com wrote: In daily development, it's common to modify your projects and re-run the jobs. If using zip or egg to package your code, you need to do this every time after modification, I think it will be boring. That's why shell scripts were invented. :-) Probably a lot easier than setting up and maintaining shared storage in a large cluster. -- Sincerely yours, Team Dimension Data Dimension Data, LLC. | www.didata.us P: 212.882.1276 | subscripti...@didata.us Data Analytics you can literally count on. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql results maintain order (in python)
On Thu, Sep 4, 2014 at 3:42 AM, jamborta jambo...@gmail.com wrote: hi, I ran into a problem with spark sql, when run a query like this select count(*), city, industry from table group by hour and I would like to take the results from the shemaRDD 1, I have to parse each line to get the values out of the dic (eg in order to convert it to a csv) 2, The order is not kept in a python dict - I couldn't find a way to maintain the original order (especially a problem in this case, when the column names are derived). In master and upcoming 1.1 release, you will got pyspark.sql.Row objects from SchemaRDD, which is namedtuple, so it will keep the order as it in the sql, you can easily convert them into tuple or list. Also, you can access the fields just like attributes. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-results-maintain-order-in-python-tp13445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2 python installations cause PySpark on Yarn problem
Hey Oleg, In pyspark, you MUST have the same version of Python in all the machines of the cluster, which means when you run `python` on these machines, all of them should be the same version ( 2.6 or 2.7). With PYSPARK_PYTHON, you can run pyspark with a specified version of Python. Also, you should install this version on all the machines and in the same location. Davies On Thu, Sep 4, 2014 at 9:25 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am evaluating the PySpark. I have hdp hortonworks installed with python 2.6.6. (I can't remove it since it is used by hortonworks). I can successfully execute PySpark on Yarn. We need to use Anaconda packages , so I install anaconda. Anaconda is installed with python 2.7.7 and it is added to classpath. After installing the anaconda Pi example stops to work - I used it for testing PySpark on Yarn. Question: How PySpark the can be used with having 2 Python versions on one machine. In classpath I have 2.7.7 on every machine. How can I check what version is used in runtime executing PySpark 2.7.7? Exception I get are the same as in previous emails: [root@HDOP-B spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]# ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0- 563.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Djava.library.path= -Xms4g -Xmx4g 14/09/04 12:53:11 INFO spark.SecurityManager: Changing view acls to: root 14/09/04 12:53:11 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 12:53:12 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/04 12:53:12 INFO Remoting: Starting remoting 14/09/04 12:53:12 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:45747] 14/09/04 12:53:12 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:45747] 14/09/04 12:53:12 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/04 12:53:12 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/04 12:53:12 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140904125312-c7ea 14/09/04 12:53:12 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB. 14/09/04 12:53:12 INFO network.ConnectionManager: Bound socket to port 37363 with id = ConnectionManagerId(HDOP-B.AGT,37363) 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/04 12:53:12 INFO storage.BlockManagerInfo: Registering block manager HDOP-B.AGT:37363 with 2.3 GB RAM 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33547 14/09/04 12:53:12 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.193.1.76:33547 14/09/04 12:53:12 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-054f4eda-b93b-47d3-87d5-c40e81fc1fe8 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54594 14/09/04 12:53:13 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:13 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/04 12:53:13 INFO ui.SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/04 12:53:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/09/04 12:53:14 INFO client.RMProxy: Connecting to ResourceManager at HDOP-N1.AGT/10.193.1.72:8050 14/09/04 12:53:14 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 6 14/09/04 12:53:14 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/04 12:53:14 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 13824 14/09/04 12:53:14 INFO yarn.Client: Preparing Local resources 14/09/04 12:53:15 INFO yarn.Client: Uploading file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar to hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409805761292_0005/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar 14/09/04 12:53:17 INFO yarn.Client: Uploading
Re: Spark on Mesos: Pyspark python libraries
PYSPARK_PYTHON may work for you, it's used to specify which Python interpreter should be used in both driver and worker. For example, if anaconda was installed as /anaconda on all the machines, then you can specify PYSPARK_PYTHON=/anaconda/bin/python to use anaconda virtual environment in PySpark. PYSPARK_PYTHON=/anaconda/bin/python spark-submit .py Or if you want to use it by default, you can put this environment in somewhere: export PYSPARK_PYTHON=/anaconda/bin/python On Tue, Sep 2, 2014 at 9:31 AM, Daniel Rodriguez df.rodriguez...@gmail.com wrote: Hi all, I am getting started with spark and mesos, I already have spark running on a mesos cluster and I am able to start the scala spark and pyspark shells, yay!. I still have questions on how to distribute 3rd party python libraries since i want to use stuff like nltk and mlib on pyspark that requires numpy. I am using salt for the configuration management so it is really easy for me to create an anaconda virtual environment and install all the libraries there on each mesos slave. My main question is if that's the recommended way of doing it 3rd party libraries? If the answer its yes, how do i tell pyspark to use that virtual environment (and not the default python) on the spark workers? I notice that there are some addFile addPyFile functions on the SparkContext but i don't want to distribute the libraries every single time if I can just do that once by writing some salt states for that. I am specially worried about numpy and its requirements. Hopefully this makes some sense. Thanks, Daniel Rodriguez - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: u'' notation with pyspark output data
u'14.0' means a unicode string, you can convert into str by u'14.0'.encode('utf8'), or you can convert it into float by float(u'14.0') Davies On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am working with pyspark and doing simple aggregation def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) counts = lines.map(doSplit).groupByKey() output = counts.collect() Iterating over output I got such format of the data u'1385501280' , u'14.0' , but actually I need to work with 14 instead of u'14.0' and 1385501280 u'1385501280' Question: how to get actually data without u'' notation? Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: repartitioning an RDD yielding imbalance
On Thu, Aug 28, 2014 at 7:00 AM, Rok Roskar rokros...@gmail.com wrote: I've got an RDD where each element is a long string (a whole document). I'm using pyspark so some of the handy partition-handling functions aren't available, and I count the number of elements in each partition with: def count_partitions(id, iterator): c = sum(1 for _ in iterator) yield (id,c) rdd.mapPartitionsWithSplit(count_partitions).collectAsMap() This returns the following: {0: 866, 1: 1158, 2: 828, 3: 876} But if I do: rdd.repartition(8).mapPartitionsWithSplit(count_partitions).collectAsMap() I get {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 3594, 7: 134} Why this strange redistribution of elements? I'm obviously misunderstanding how spark does the partitioning -- is it a problem with having a list of strings as an RDD? This imbalance was introduce by BatchedDeserializer. By default, Python elements in RDD are serialized by pickle in batch (1024 elements in one batch), so in the view of Scala, it only see one or two element of Array[Byte] in the RDD, then imbalance happened. To fix this, you could change the default batchSize to 10 (or less) or reserialize your RDD as in unbatched mode, for example: sc = SparkContext(batchSize=10) rdd = sc.textFile().repartition(8) OR rdd._reserialize(PickleSerializer()).repartition(8) PS: _reserialize() is not an public API, so it may be changed in the future. Davies Help vey much appreciated! Thanks, Rok - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python script runs fine in local mode, errors in other modes
Could you post the completed stacktrace? On Tue, Aug 19, 2014 at 10:47 AM, Aaron aaron.doss...@target.com wrote: Hello, I have a relatively simple python program that works just find in local most (--master local) but produces a strange error when I try to run it via Yarn ( --deploy-mode client --master yarn) or just execute the code through pyspark. Here's the code: sc = SparkContext(appName=foo) input = sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda myline: myline.split(,)) The third line yields this error: TypeError: 'bool' object is not callable But myline seems to be a valid string if I look at it this way: mappedToLines = input.map(lambda myline: len(myline)) mappedToLines.collect() [84, 104, 109, 89, 108, 92, 89, 90, 93, 102] I just now have access to a Hadoop cluster with Spark installed, so hopefully I'm running into some simple issues that I never had to deal with when testing in my own sandbox in purely local mode before. Any help would be appreciated, thanks! -Aaron View this message in context: Python script runs fine in local mode, errors in other modes Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python script runs fine in local mode, errors in other modes
This script run very well without your CSV file. Could download you CSV file into local disks, and narrow down to the lines which triggle this issue? On Tue, Aug 19, 2014 at 12:02 PM, Aaron aaron.doss...@target.com wrote: These three lines of python code cause the error for me: sc = SparkContext(appName=foo) input = sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda myline: myline.split(,)) The file I'm loading is a simple CSV. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p12398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Segmented fold count
import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote: Can anyone assist with a scan of the following kind (Python preferred, but whatever..)? I'm looking for a kind of segmented fold count. Input: [1,1,1,2,2,3,4,4,5,1] Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)] or preferably two output columns: id: [1,2,3,4,5,1] count: [3,2,1,2,1,1] I can use a groupby/count, except for the fact that I just want to scan - not resort. Ideally this would be as low-level as possible and perform in a simple single scan. It also needs to retain the original sort order. Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Segmented fold count
On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote: What happens when a run of numbers is spread across a partition boundary? I think you might end up with two adjacent groups of the same value in that situation. Yes, need another scan to combine this continuous groups with same value. On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote: import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote: Can anyone assist with a scan of the following kind (Python preferred, but whatever..)? I'm looking for a kind of segmented fold count. Input: [1,1,1,2,2,3,4,4,5,1] Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)] or preferably two output columns: id: [1,2,3,4,5,1] count: [3,2,1,2,1,1] I can use a groupby/count, except for the fact that I just want to scan - not resort. Ideally this would be as low-level as possible and perform in a simple single scan. It also needs to retain the original sort order. Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: application as a service
Another option is using Tachyon to cache the RDD, then the cache can be shared by different applications. See how to use Spark with Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html Davies On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote: You can also look into using ooyala's job server at https://github.com/ooyala/spark-jobserver This already has a spary server built in that allows you to do what has already been explained above. Sounds like it should solve your problem. Enjoy! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging complicated small matrices to one big matrix
rdd.flatMap(lambda x:x) maybe could solve your problem, it will convert an RDD from [[[1,2,3],[4,5,6]],[[7,8,9,],[10,11,12]]] into: [[1,2,3], [4,5,6], [7,8,9,], [10,11,12]] On Mon, Aug 18, 2014 at 2:42 AM, Chengi Liu chengi.liu...@gmail.com wrote: I have an rdd in pyspark which looks like follows: It has two sub matrices..(array,array) [ array([[-13.00771575, 0.2740844 , 0.9752694 , 0.67465999, -1.45741537, 0.546775 , 0.7900841 , -0.59473707, -1.11752044, 0.61564356], [ -0., 12.20115746, -0.49016935, -0.9236129 , -1.1693633 , -0.39135626, 1.10752864, 0.16920118, -1.098806 , 1.10045185], [ 0., 0., -11.26425992, 0.56309152, 0.44872832, 0.69722768, 0.84200281, 0.89537327, 0.10460865, -0.62938474], [ -0., -0., 0., 13.1112119 , 0.39986223, -1.22218936, 0.72315955, 0.12208597, -0.6258082 , -0.91077504], [ 0., -0., 0., 0., -11.04483145, -1.71948244, -0.73239228, -0.19651712, -0.97931725, -0.43263423], [ 0., 0., 0., -0., 0., -12.1996715 , -0.05580816, 0.20517336, 0.53584998, 1.3370874 ], [ 0., -0., -0., 0., 0., 0., 12.32603631, 0.47498103, -0.65901705, -0.85713277], [ 0., 0., 0., -0., 0., -0., 0., 11.90030251, 1.73036795, 0.70588443], [ -0., -0., 0., 0., -0., -0., 0., -0., 13.00493769, 1.37753403], [ 0., -0., 0., 0., 0., -0., 0., 0., -0., -10.89006283]]), array([[-12.43375184, 1.07703619, -0.47818221, 1.65919732, 0.96307502, -1.6322447 , -1.09409297, -0.64849112, -1.09349557, -0.68706834], [ 0., -11.93616969, 0.08784614, 1.76677411, -0.0585134 , -0.70979485, 0.29757848, 1.19562173, -1.54176475, 1.71500862], [ 0., -0., -12.42060272, 2.17060365, -1.3212244 , 0.73742297, 0.50410937, -0.35278129, -0.40513689, -0.81222302], [ -0., 0., 0., -11.93419851, -1.15614929, 1.04085489, 0.69986351, -1.3615322 , 0.43467842, -1.33041858], [ -0., -0., 0., 0., 11.22907137, -0.12925322, 0.46293906, -2.01577912, -2.26566926, -0.17750339], [ 0., 0., 0., 0., -0., -12.0705513 , -0.19432359, 0.41226088, 0.79436699, -0.61288711], [ 0., -0., 0., 0., -0., -0., 11.99770753, -1.24277228, 1.32240282, 1.5140609 ], [ -0., 0., -0., -0., 0., -0., 0., -13.07008472, 0.52031563, -1.56247391], [ 0., -0., 0., 0., -0., -0., -0., -0., 13.16585107, 0.57741265], [ 0., 0., -0., -0., 0., 0., 0., -0., -0., -13.53719704]]) ] So, basically I have sub matrices like [sub_matrix_1, sub_matrix_2 ] (the above has just two matrices.. I want to combine in one big matrix column wise [ sub_matrix_1 sub_matrix_2 ] Any suggestions? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Segmented fold count
On Mon, Aug 18, 2014 at 7:41 PM, fil f...@pobox.com wrote: fil wrote - Python functions like groupCount; these get reflected from their Python AST and converted into a Spark DAG? Presumably if I try and do something non-convertible this transformation process will throw an error? In other words this runs in the JVM. Further to this - it seems that Python does run on each node in the cluster, meaning it runs outside the JVM. Presumably this means that writing this in Scala would be far more performant. Could I write groupCount() in Scala, and then use it from Pyspark? Care to supply an example, I'm finding them hard to find :) It's doable, but not so convenient. If you really care about the performance difference, you should write your program in Scala. fil wrote - I had considered that partitions were batches of distributable work, and generally large. Presumably the above is OK with small groups (eg. average size 10) - this won't kill performance? I'm still a bit confused about the dual meaning of partition: work segmentation, and key groups. Care to clarify anyone - when are partitions used to describe chunks of data for different nodes in the cluster (ie. large), and when are they used to describe groups of items in data (ie. small).. An partition means a chunk of data in RDD, the computation on a partition is a task, which will be sent to a node in the cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question on mappartitionwithsplit
On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, Thanks for the response.. In the second case f2?? foo will have to be declared globablly??right?? My function is somthing like: def indexing(splitIndex, iterator): count = 0 offset = sum(offset_lists[:splitIndex]) if splitIndex else 0 indexed = [] for i, e in enumerate(iterator): index = count + offset + i for j, ele in enumerate(e): indexed.append((index, j, ele)) yield indexed In this function `indexing`, `offset_lists` should be global. def another_funct(offset_lists): #get that damn offset_lists rdd.mapPartitionsWithSplit(indexing) But then, the issue is that offset_lists? Any suggestions? Basically, you can do what you do in normal Python program, PySpark will send the global variables or closures to worker processes automatically. So, you can : def indexing(splitIndex, iterator, offset_lists): pass def another_func(offset_lists): rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it, offset_lists)) Or: def indexing(splitIndex, iterrator): # access offset_lists def another_func(offset): global offset_lists offset_lists = offset rdd. mapPartitionsWithSplit(indexing) Or: def another_func(offset_lists): def indexing(index, iterator): # access offset_lists pass rdd.mapPartitionsWithIndex(indexing) On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu dav...@databricks.com wrote: The callback function f only accept 2 arguments, if you want to pass another objects to it, you need closure, such as: foo=xxx def f(index, iterator, foo): yield (index, foo) rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo)) also you can make f become `closure`: def f2(index, iterator): yield (index, foo) rdd.mapPartitionsWithIndex(f2) On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, In this example: http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit Let say, f takes three arguments: def f(splitIndex, iterator, foo): yield splitIndex Now, how do i send this foo parameter to this method? rdd.mapPartitionsWithSplit(f) Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset
Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in this way: rdd.countByKey().sortBy(lambda x:x[1], False).take(10) Davies On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote: Hi, Let me begin by describing my Spark setup on EC2 (launched using the provided spark-ec2.py script): 100 c3.2xlarge workers (8 cores 15GB memory each) 1 c3.2xlarge Master (only running master daemon) Spark 1.0.2 8GB mounted at / 80 GB mounted at /mnt spark-defaults.conf (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) spark.executor.memory 12991m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf spark.shuffle.file.buffer.kb1024 spark.reducer.maxMbInFlight 96 spark.serializer.objectStreamReset 10 spark.akka.frameSize100 spark.akka.threads 32 spark.akka.timeout 1000 spark.serializerorg.apache.spark.serializer.KryoSerializer spark-env.sh (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=8 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py script export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf export SPARK_PUBLIC_DNS=wget command to get the public hostname, as added by spark-ec2.py script # Set a high ulimit for large shuffles ulimit -n 1000 I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from a S3 bucket, tries to group it based on a key and counts the number of groups. The number of partitions for the input textFile() is set to 1600 and the number of partitions for the groupByKey() operation is also 1600 conf = SparkConf().setAppName(JOB_NAME).setMaster(master) sc = SparkContext(conf=sconf) drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) drive_grouped_by_user_vin_and_week = drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ .groupByKey(numPartitions=user_vin_week_group_partitions)\ .count() Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of which 1595 complete in under a minute. The same 5 TIDs consistently fail with the following errors in the logs of their respective Executors: 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) ... 10 more 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read(BufferedInputStream.java:254) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at
Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset
The 1.1 release will come out this or next month, we will really appreciate that if you could test it with you real case. Davies On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh ar...@automatic.com wrote: Thanks Davies. I am running Spark 1.0.2 (which seems to be the latest release) I'll try changing it to a reduceByKey() and check the size of the largest key and post the results here. UPDATE: If I run this job and DO NOT specify the number of partitions for the input textFile() (124 GB being read in from S3), Spark launches 41 tasks for the flatMap(). However, this time, none of the flatMap() tasks complete and I start seeing the same Connection Reset errors. On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote: Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in this way: rdd.countByKey().sortBy(lambda x:x[1], False).take(10) Davies On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote: Hi, Let me begin by describing my Spark setup on EC2 (launched using the provided spark-ec2.py script): 100 c3.2xlarge workers (8 cores 15GB memory each) 1 c3.2xlarge Master (only running master daemon) Spark 1.0.2 8GB mounted at / 80 GB mounted at /mnt spark-defaults.conf (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) spark.executor.memory 12991m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf spark.shuffle.file.buffer.kb1024 spark.reducer.maxMbInFlight 96 spark.serializer.objectStreamReset 10 spark.akka.frameSize100 spark.akka.threads 32 spark.akka.timeout 1000 spark.serializerorg.apache.spark.serializer.KryoSerializer spark-env.sh (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=8 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py script export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf export SPARK_PUBLIC_DNS=wget command to get the public hostname, as added by spark-ec2.py script # Set a high ulimit for large shuffles ulimit -n 1000 I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from a S3 bucket, tries to group it based on a key and counts the number of groups. The number of partitions for the input textFile() is set to 1600 and the number of partitions for the groupByKey() operation is also 1600 conf = SparkConf().setAppName(JOB_NAME).setMaster(master) sc = SparkContext(conf=sconf) drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) drive_grouped_by_user_vin_and_week = drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ .groupByKey(numPartitions=user_vin_week_group_partitions)\ .count() Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of which 1595 complete in under a minute. The same 5 TIDs consistently fail with the following errors in the logs of their respective Executors: 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt
Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset
In Spark (Scala/Java), it will spill the data to disk, but in PySpark, it will not. On Wed, Aug 13, 2014 at 2:10 PM, Arpan Ghosh ar...@automatic.com wrote: So you are saying that in-spite of spark.shuffle.spill being set to true by default, version 1.0.2 does not spill data to disk during a groupByKey()? On Wed, Aug 13, 2014 at 2:05 PM, Davies Liu dav...@databricks.com wrote: The 1.1 release will come out this or next month, we will really appreciate that if you could test it with you real case. Davies On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh ar...@automatic.com wrote: Thanks Davies. I am running Spark 1.0.2 (which seems to be the latest release) I'll try changing it to a reduceByKey() and check the size of the largest key and post the results here. UPDATE: If I run this job and DO NOT specify the number of partitions for the input textFile() (124 GB being read in from S3), Spark launches 41 tasks for the flatMap(). However, this time, none of the flatMap() tasks complete and I start seeing the same Connection Reset errors. On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote: Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in this way: rdd.countByKey().sortBy(lambda x:x[1], False).take(10) Davies On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote: Hi, Let me begin by describing my Spark setup on EC2 (launched using the provided spark-ec2.py script): 100 c3.2xlarge workers (8 cores 15GB memory each) 1 c3.2xlarge Master (only running master daemon) Spark 1.0.2 8GB mounted at / 80 GB mounted at /mnt spark-defaults.conf (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) spark.executor.memory 12991m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf spark.shuffle.file.buffer.kb1024 spark.reducer.maxMbInFlight 96 spark.serializer.objectStreamReset 10 spark.akka.frameSize100 spark.akka.threads 32 spark.akka.timeout 1000 spark.serializerorg.apache.spark.serializer.KryoSerializer spark-env.sh (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=8 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py script export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf export SPARK_PUBLIC_DNS=wget command to get the public hostname, as added by spark-ec2.py script # Set a high ulimit for large shuffles ulimit -n 1000 I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from a S3 bucket, tries to group it based on a key and counts the number of groups. The number of partitions for the input textFile() is set to 1600 and the number of partitions for the groupByKey() operation is also 1600 conf = SparkConf().setAppName(JOB_NAME).setMaster(master) sc = SparkContext(conf=sconf) drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) drive_grouped_by_user_vin_and_week = drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ .groupByKey(numPartitions=user_vin_week_group_partitions)\ .count() Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of which 1595 complete in under a minute. The same 5 TIDs consistently fail with the following errors in the logs of their respective Executors: 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262
Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset
For the hottest key, it will need about 1-2 GB memory for Python worker to do groupByKey(). These configurations can not help with the memory of Python worker. So, two options: 1) use reduceByKey() or combineByKey() to reduce the memory consumption in Python worker. 2) try master or 1.1 branch with the feature of spilling in Python. Davies On Wed, Aug 13, 2014 at 4:08 PM, Arpan Ghosh ar...@automatic.com wrote: Here are the biggest keys: [ (17634, 87874097), (8407, 38395833), (20092, 14403311), (9295, 4142636), (14359, 3129206), (13051, 2608708), (14133, 2073118), (4571, 2053514), (16175, 2021669), (5268, 1908557), (3669, 1687313), (14051, 1628416), (19660, 1619860), (10206, 1546037), (3740, 1527272), (426, 1522788), Should I try to increase spark.shuffle.memoryFraction and decrease spark.storage.memoryFraction ? On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote: Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in this way: rdd.countByKey().sortBy(lambda x:x[1], False).take(10) Davies On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote: Hi, Let me begin by describing my Spark setup on EC2 (launched using the provided spark-ec2.py script): 100 c3.2xlarge workers (8 cores 15GB memory each) 1 c3.2xlarge Master (only running master daemon) Spark 1.0.2 8GB mounted at / 80 GB mounted at /mnt spark-defaults.conf (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) spark.executor.memory 12991m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf spark.shuffle.file.buffer.kb1024 spark.reducer.maxMbInFlight 96 spark.serializer.objectStreamReset 10 spark.akka.frameSize100 spark.akka.threads 32 spark.akka.timeout 1000 spark.serializerorg.apache.spark.serializer.KryoSerializer spark-env.sh (A lot of config options have been added here to try and fix the problem. I also encounter the problem while running with the default options) export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=8 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py script export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf export SPARK_PUBLIC_DNS=wget command to get the public hostname, as added by spark-ec2.py script # Set a high ulimit for large shuffles ulimit -n 1000 I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from a S3 bucket, tries to group it based on a key and counts the number of groups. The number of partitions for the input textFile() is set to 1600 and the number of partitions for the groupByKey() operation is also 1600 conf = SparkConf().setAppName(JOB_NAME).setMaster(master) sc = SparkContext(conf=sconf) drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) drive_grouped_by_user_vin_and_week = drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ .groupByKey(numPartitions=user_vin_week_group_partitions)\ .count() Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of which 1595 complete in under a minute. The same 5 TIDs consistently fail with the following errors in the logs of their respective Executors: 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker
Re: error with pyspark
On Fri, Aug 8, 2014 at 9:12 AM, Baoqiang Cao bqcaom...@gmail.com wrote: Hi There I ran into a problem and can’t find a solution. I was running bin/pyspark ../python/wordcount.py you could use bin/spark-submit ../python/wordcount.py The wordcount.py is here: import sys from operator import add from pyspark import SparkContext datafile = '/mnt/data/m1.txt' sc = SparkContext() outfile = datafile + '.freq' lines = sc.textFile(datafile, 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() outf = open(outfile, 'w') for (word, count) in output: outf.write(word.encode('utf-8') + '\t' + str(count) + '\n') outf.close() The error message is here: 14/08/08 16:01:59 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 0) java.io.FileNotFoundException: /tmp/spark-local-20140808160150-d36b/12/shuffle_0_0_468 (Too many open files) This message means that the Spark (JVM) had reach the max number of open files, there are fd leak some where, unfortunately I can not reproduce this problem. What is the version of Spark? at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:107) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:175) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The m1.txt is about 4G, and I have 120GB Ram and used -Xmx120GB. It is on Ubuntu. Any help please? Best Baoqiang Cao Blog: http://baoqiang.org Email: bqcaom...@gmail.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark + executor lost
What is the environment ? YARN or Mesos or Standalone? It will be more helpful if you could show more loggings. On Wed, Aug 6, 2014 at 7:25 PM, Avishek Saha avishek.s...@gmail.com wrote: Hi, I get a lot of executor lost error for saveAsTextFile with PySpark and Hadoop 2.4. For small datasets this error occurs but since the dataset is small it gets eventually written to the file. For large datasets, it takes forever to write the final output. Any help is appreciated. Avishek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark, numpy arrays and binary data
On Thu, Aug 7, 2014 at 12:06 AM, Rok Roskar rokros...@gmail.com wrote: sure, but if you knew that a numpy array went in on one end, you could safely use it on the other end, no? Perhaps it would require an extension of the RDD class and overriding the colect() method. Could you give a short example about how numpy array is used in your project? sure -- basically our main data structure is a container class (acts like a dictionary) that holds various arrays that represent particle data. Each particle has various properties, position, velocity, mass etc. you get at these individual properties by calling something like s['pos'] where 's' is the container object and 'pos' is the name of the array. A really common use case then is to select particles based on their properties and do some plotting, or take a slice of the particles, e.g. you might do r = np.sqrt((s['pos']**2).sum(axis=1)) ind = np.where(r 5) plot(s[ind]['x'], s[ind]['y']) Internally, the various arrays are kept in a dictionary -- I'm hoping to write a class that keeps them in an RDD instead. To the user, this would have to be transparent, i.e. if the user wants to get at the data for specific particles, she would just have to do s['pos'][1,5,10] for example, and the data would be fetched for her from the RDD just like it would be if she were simply using the usual single-machine version. This is why the writing to/from files when retrieving data from the RDD really is a no-go -- can you recommend how this can be circumvented? RDD is expected as distributed, so accessing the items in RDD by key or indices directly will not be easy. So I think you can not mapping this interface to an RDD, or the result will be what user expected, such as very very slow. In order to parallelize the computation, most of them should be done by transformation of RDDs. Finally, fetch the data from RDD by collect(), then do the plotting stuff. Can this kind of work flow work for you cases? Davies - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: trouble with jsonRDD and jsonFile in pyspark
There is a PR to fix this: https://github.com/apache/spark/pull/1802 On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote: I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. # dictionary as value works fine print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] # dictionary as value works fine, even when inner keys are varied print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] # dictionary as value works fine when inner keys are missing and outer key is present print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] # dictionary as value FAILS when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... # dictionary as value FAILS when outer key is present with null value print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... # nested lists work even when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ])
Re: PySpark, numpy arrays and binary data
numpy array only can support basic types, so we can not use it during collect() by default. Could you give a short example about how numpy array is used in your project? On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote: Hello, I'm interested in getting started with Spark to scale our scientific analysis package (http://pynbody.github.io) to larger data sets. The package is written in Python and makes heavy use of numpy/scipy and related frameworks. I've got a couple of questions that I have not been able to find easy answers to despite some research efforts... I hope someone here can clarify things for me a bit! * is there a preferred way to read binary data off a local disk directly into an RDD? Our I/O routines are built to read data in chunks and each chunk could be read by a different process/RDD, but it's not clear to me how to accomplish this with the existing API. Since the idea is to process data sets that don't fit into a single node's memory, reading first and then distributing via sc.parallelize is obviously not an option. If you already know how to partition the data, then you could use sc.parallelize() to distribute the description of your data, then read the data in parallel by given descriptions. For examples, you can partition your data into (path, start, length), then partitions = [(path1, start1, length), (path1, start2, length), ...] def read_chunk(path, start, length): f = open(path) f.seek(start) data = f.read(length) #processing the data rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk) * related to the first question -- when an RDD is created by parallelizing a numpy array, the array gets serialized and distributed. I see in the source that it actually gets written into a file first (!?) -- but surely the Py4J bottleneck for python array types (mentioned in the source comment) doesn't really apply to numpy arrays? Is it really necessary to dump the data onto disk first? Conversely, the collect() seems really slow and I suspect that this is due to the combination of disk I/O and python list creation. Are there any ways of getting around this if numpy arrays are being used? I'd be curious about any other best-practices tips anyone might have for running pyspark with numpy data...! Thanks! Rok - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark inferSchema
On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.StackOverflowError
Could you create an re-producable script (and data) to allow us to investigate this? Davies On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc): #process file return rdd if __name__ ==__main__: conf = SparkConf() conf.setMaster('local') sc = SparkContext(conf =conf) sc.setCheckpointDir(root+temp/) data = sc.parallelize([]) for i,f in enumerate(files): data = data.union(read(f,sc)) union is an lazy transformation, you could union them at once, rdds = [read(f,sc) for f in files] rdd = sc.union(rdds) if i ==20: data.checkpoint() data.count() if i == 500:break #print data.count() #rdd_1 = read(files[0],sc) data.saveAsTextFile(root+output/) But I see this error: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o9564.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.Bits.putInt(Bits.java:93) java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark inferSchema
This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Last step of processing is using too much memory.
When you do groupBy(), it wish to load all the data into memory for best performance, then you should specify the number of partitions carefully. In Spark master or upcoming 1.1 release, PySpark can do external groupBy(), it means that it will dumps the data into disks if there is not enough memory to hold all the data. It also will help in this case. On Fri, Jul 18, 2014 at 1:56 AM, Roch Denis rde...@exostatic.com wrote: Well, for what it's worth, I found the issue after spending the whole night running experiments;). Basically, I needed to give a higher number of partition for the groupByKey. I was simply using the default, which generated only 4 partitions and so the whole thing blew up. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How do you debug a PythonException?
The exception in Python means that the worker try to read command from JVM, but it reach the end of socket (socket had been closed). So it's possible that there another exception happened in JVM. Could you change the log level of log4j, then check is there any problem inside JVM? Davies On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any clues? This looks like a bug, but I can't report it without more precise information. On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I’m in the PySpark shell and I’m trying to do this: a = sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json', minPartitions=sc.defaultParallelism * 3).cache() a.map(lambda x: len(x)).max() My job dies with the following: 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /root/spark/python/pyspark/serializers.py, line 142, in _read_with_length length = read_int(stream) File /root/spark/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on ip-10-190-171-217.ec2.internal: remote Akka client disassociated How do I debug this? I’m using 1.0.2-rc1 deployed to EC2. Nick View this message in context: How do you debug a PythonException? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: zip two RDD in pyspark
On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote: I have a file in s3 that I want to map each line with an index. Here is my code: input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() N input_data.count() index = sc.parallelize(range(N), 6) index.zip(input_data).collect() I think you can not do zipWithIndex() in this way, because the number of lines in each partition of input_data will be different than index. You need get the exact number of lines for each partitions first, then generate correct index. It will be easy to do with mapPartitions() nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect() starts = [sum(nums[:i]) for i in range(len(nums))] zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) for j,x in enumerate(it))) ... 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1) finished in 0.031 s 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1, took 0.03707 s Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/rdd.py, line 584, in collect return list(self._collect_iterator_through_file(bytesInJava)) File /root/spark/python/pyspark/rdd.py, line 592, in _collect_iterator_through_file self.ctx._writeToFile(iterator, tempFile.name) File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.writeToFile. : java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) As I see it, the job is completed, but I don't understand what's happening to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD and it works fine. But here I have a MappedRDD at textFile. Not sure what's going on here. Could you provide an script and dataset to reproduce this error? Maybe there are some corner cases during serialization. Also, why Python does not have ZipWithIndex()? The features in PySpark are much less than Spark, hopefully it will catch up in next two releases. Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.