driver memory management
Hi All, I am interested to collect() a large RDD so that I can run a learning algorithm on it. I've noticed that when I don't increase SPARK_DRIVER_MEMORY I can run out of memory. I've also noticed that it looks like the same fraction of memory is reserved for storage on the driver as on the worker nodes, and that the web UI doesn't show any storage usage on the driver. Since that memory is reserved for storage, it seems possible that it is not being used towards the collection of my RDD. Is there a way to configure the memory management ( spark.storage.memoryFraction, spark.shuffle.memoryFraction) for the driver separately from the workers? Is there any reason to leave space for shuffle or storage on the driver? It seems like I never see either of these used on the web UI, although I may not be interpreting the UI correctly or my jobs may not trigger the use case. For context, I am using PySpark (so much of my processing happens outside of the allocated memory in Java) and running the Spark 1.1.0 release binaries. best, -Brad
Re: java.io.IOException Error in task deserialization
I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: java.io.IOException Error in task deserialization
FWIW I suspect that each count operation is an opportunity for you to trigger the bug, and each filter operation increases the likelihood of setting up the bug. I normally don't come across this error until my job has been running for an hour or two and had a chance to build up longer lineages for some RDDs. It sounds like your data is a bit smaller and it's more feasible for you to build up longer lineages more quickly. If you can reduce your number of filter operations (for example by combining some into a single function) that may help. It may also help to introduce persistence or checkpointing at intermediate stages so that the length of the lineages that have to get replayed isn't as long. On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote: No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: java.lang.NegativeArraySizeException in pyspark
What is the error? Could you file a JIRA for it? Turns out there's actually 3 separate errors (indicated below), one of which **silently returns the wrong value to the user*.* Should I file a separate JIRA for each one? What level should I mark these as (critical, major, etc.)? I'm not sure that all of these are bugs as much as feature requests since it looks like the design of FramedSerializer includes some size constraints (https://github.com/apache/spark/blob/master/python/pyspark/serializers.py Serializer that writes objects as a stream of (length, data) pairs, where C{length} is a 32-bit integer and data is C{length} bytes.). Attempting to reproduce the bug in isolation in iPython notebook I've observed the following. Note that I'm running python 2.7.3 on all machines and using the Spark 1.1.0 binaries. **BLOCK 1** [no problem] import cPickle from pyspark import SparkContext def check_pre_serialized(size): msg = cPickle.dumps(range(2 ** size)) print 'serialized length:', len(msg) bvar = sc.broadcast(msg) print 'length recovered from broadcast variable:', len(bvar.value) print 'correct value recovered:', msg == bvar.value bvar.unpersist() def check_unserialized(size): msg = range(2 ** size) bvar = sc.broadcast(msg) print 'correct value recovered:', msg == bvar.value bvar.unpersist() SparkContext.setSystemProperty('spark.executor.memory', '15g') SparkContext.setSystemProperty('spark.cores.max', '5') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'broadcast_bug') **BLOCK 2** [no problem] check_pre_serialized(20) serialized length: 9374656 length recovered from broadcast variable: 9374656 correct value recovered: True **BLOCK 3** [no problem] check_unserialized(20) correct value recovered: True **BLOCK 4** [no problem] check_pre_serialized(27) serialized length: 1499501632 length recovered from broadcast variable: 1499501632 correct value recovered: True **BLOCK 5** [no problem] check_unserialized(27) correct value recovered: True ***BLOCK 6** [ERROR 1: unhandled error from cPickle.dumps inside sc.broadcast]* check_pre_serialized(28) . /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) 354 355 def dumps(self, obj): -- 356 return cPickle.dumps(obj, 2) 357 358 loads = cPickle.loads SystemError: error return without exception set **BLOCK 7** [no problem] check_unserialized(28) correct value recovered: True ***BLOCK 8** [ERROR 2: no error occurs and *incorrect result* is returned]* check_pre_serialized(29) serialized length: 6331339840 length recovered from broadcast variable: 2036372544 correct value recovered: False ***BLOCK 9** [ERROR 3: unhandled error from zlib.compress inside sc.broadcast]* check_unserialized(29) .. /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) 418 419 def dumps(self, obj): -- 420 return zlib.compress(self.serializer.dumps(obj), 1) 421 422 def loads(self, obj): OverflowError: size does not fit in an int ***BLOCK 10** [ERROR 1]* check_pre_serialized(30) ...same as above... ***BLOCK 11** [ERROR 3]* check_unserialized(30) ...same as above... On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu dav...@databricks.com wrote: On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), What is the error? Could you file a JIRA for it? that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers
Re: java.lang.NegativeArraySizeException in pyspark
Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), suggesting that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home
java.util.NoSuchElementException: key not found
Hi All, I suspect I am experiencing a bug. I've noticed that while running larger jobs, they occasionally die with the exception java.util.NoSuchElementException: key not found xyz, where xyz denotes the ID of some particular task. I've excerpted the log from one job that died in this way below and attached the full log for reference. I suspect that my bug is the same as SPARK-2002 (linked below). Is there any reason to suspect otherwise? Is there any known workaround other than not coalescing? https://issues.apache.org/jira/browse/SPARK-2002 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCAMwrk0=d1dww5fdbtpkefwokyozltosbbjqamsqqjowlzng...@mail.gmail.com%3E Note that I have been coalescing SchemaRDDs using srdd = SchemaRDD(srdd._jschema_rdd.coalesce(partitions, False, None), sqlCtx), the workaround described in this thread. http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3ccanr-kkciei17m43-yz5z-pj00zwpw3ka_u7zhve2y7ejw1v...@mail.gmail.com%3E ... 14/09/15 21:43:14 INFO scheduler.TaskSetManager: Starting task 78.0 in stage 551.0 (TID 78738, bennett.research.intel-research.net, PROCESS_LOCAL, 1056 bytes) ... 14/09/15 21:43:15 INFO storage.BlockManagerInfo: Added taskresult_78738 in memory on bennett.research.intel-research.net:38074 (size: 13.0 MB, free: 1560.8 MB) ... 14/09/15 21:43:15 ERROR scheduler.TaskResultGetter: Exception while getting task result java.util.NoSuchElementException: key not found: 78738 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.TaskSetManager.handleTaskGettingResult(TaskSetManager.scala:500) at org.apache.spark.scheduler.TaskSchedulerImpl.handleTaskGettingResult(TaskSchedulerImpl.scala:348) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:52) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) I am running the pre-compiled 1.1.0 binaries. best, -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
Hi Andrew, I agree with Nicholas. That was a nice, concise summary of the meaning of the locality customization options, indicators and default Spark behaviors. I haven't combed through the documentation end-to-end in a while, but I'm also not sure that information is presently represented somewhere and it would be great to persist it somewhere besides the mailing list. best, -Brad On Fri, Sep 12, 2014 at 12:12 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Andrew, This email was pretty helpful. I feel like this stuff should be summarized in the docs somewhere, or perhaps in a blog post. Do you know if it is? Nick On Thu, Jun 5, 2014 at 6:36 PM, Andrew Ash and...@andrewash.com wrote: The locality is how close the data is to the code that's processing it. PROCESS_LOCAL means data is in the same JVM as the code that's running, so it's really fast. NODE_LOCAL might mean that the data is in HDFS on the same node, or in another executor on the same node, so is a little slower because the data has to travel across an IPC connection. RACK_LOCAL is even slower -- data is on a different server so needs to be sent over the network. Spark switches to lower locality levels when there's no unprocessed data on a node that has idle CPUs. In that situation you have two options: wait until the busy CPUs free up so you can start another task that uses data on that server, or start a new task on a farther away server that needs to bring data from that remote place. What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The main tunable option is how far long the scheduler waits before starting to move data rather than code. Those are the spark.locality.* settings here: http://spark.apache.org/docs/latest/configuration.html If you want to prevent this from happening entirely, you can set the values to ridiculously high numbers. The documentation also mentions that 0 has special meaning, so you can try that as well. Good luck! Andrew On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: coalesce on SchemaRDD in pyspark
Hi Davies, Thanks for the quick fix. I'm sorry to send out a bug report on release day - 1.1.0 really is a great release. I've been running the 1.1 branch for a while and there's definitely lots of good stuff. For the workaround, I think you may have meant: srdd2 = SchemaRDD(srdd._jschema_rdd.coalesce(N, False, None), sqlCtx) Note: _schema_rdd - _jschema_rdd false - False That workaround seems to work fine (in that I've observed the correct number of partitions in the web-ui, although haven't tested it any beyond that). Thanks! -Brad On Thu, Sep 11, 2014 at 11:30 PM, Davies Liu dav...@databricks.com wrote: This is a bug, I had create an issue to track this: https://issues.apache.org/jira/browse/SPARK-3500 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369 Before next bugfix release, you can workaround this by: srdd = sqlCtx.jsonRDD(rdd) srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx) On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm having some trouble with the coalesce and repartition functions for SchemaRDD objects in pyspark. When I run: sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}', '{foo:baz}'])).coalesce(1) I get this error: Py4JError: An error occurred while calling o94.coalesce. Trace: py4j.Py4JException: Method coalesce([class java.lang.Integer, class java.lang.Boolean]) does not exist For context, I have a dataset stored in a parquet file, and I'm using SQLContext to make several queries against the data. I then register the results of these as queries new tables in the SQLContext. Unfortunately each new table has the same number of partitions as the original (despite being much smaller). Hence my interest in coalesce and repartition. Has anybody else encountered this bug? Is there an alternate workflow I should consider? I am running the 1.1.0 binaries released today. best, -Brad
coalesce on SchemaRDD in pyspark
Hi All, I'm having some trouble with the coalesce and repartition functions for SchemaRDD objects in pyspark. When I run: sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}', '{foo:baz}'])).coalesce(1) I get this error: Py4JError: An error occurred while calling o94.coalesce. Trace: py4j.Py4JException: Method coalesce([class java.lang.Integer, class java.lang.Boolean]) does not exist For context, I have a dataset stored in a parquet file, and I'm using SQLContext to make several queries against the data. I then register the results of these as queries new tables in the SQLContext. Unfortunately each new table has the same number of partitions as the original (despite being much smaller). Hence my interest in coalesce and repartition. Has anybody else encountered this bug? Is there an alternate workflow I should consider? I am running the 1.1.0 binaries released today. best, -Brad
Re: TimeStamp selection with SparkSQL
My approach may be partly influenced by my limited experience with SQL and Hive, but I just converted all my dates to seconds-since-epoch and then selected samples from specific time ranges using integer comparisons. On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote: There are 2 SQL dialects, one is a very basic SQL support and another is Hive QL. In most of cases I think people prefer using the HQL, which also means you have to use HiveContext instead of the SQLContext. In this particular query you showed, seems datatime is the type Date, unfortunately, neither of those SQL dialect supports Date, but Timestamp. Cheng Hao *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com] *Sent:* Friday, September 05, 2014 5:37 AM *To:* user@spark.apache.org *Subject:* TimeStamp selection with SparkSQL I may have missed this but is it possible to select on datetime in a SparkSQL query jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime = '2014-01-01') Additionally, is there a guide as to what SQL is valid? The guide says, Note that Spark SQL currently uses a very basic SQL parser It would be great to post what is currently supported. --Ben
Re: TimeStamp selection with SparkSQL
Preprocessing (after loading the data into HDFS). I started with data in JSON format in text files (stored in HDFS), and then loaded the data into parquet files with a bit of preprocessing and now I always retrieve the data by creating a SchemaRDD from the parquet file and using the SchemaRDD to back a table in a SQLContext. On Fri, Sep 5, 2014 at 9:53 AM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi Brad, When you do the conversion is this a Hive/Spark job or is it a pre-processing step before loading into HDFS? ---Ben On Fri, Sep 5, 2014 at 10:29 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: My approach may be partly influenced by my limited experience with SQL and Hive, but I just converted all my dates to seconds-since-epoch and then selected samples from specific time ranges using integer comparisons. On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote: There are 2 SQL dialects, one is a very basic SQL support and another is Hive QL. In most of cases I think people prefer using the HQL, which also means you have to use HiveContext instead of the SQLContext. In this particular query you showed, seems datatime is the type Date, unfortunately, neither of those SQL dialect supports Date, but Timestamp. Cheng Hao *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com] *Sent:* Friday, September 05, 2014 5:37 AM *To:* user@spark.apache.org *Subject:* TimeStamp selection with SparkSQL I may have missed this but is it possible to select on datetime in a SparkSQL query jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime = '2014-01-01') Additionally, is there a guide as to what SQL is valid? The guide says, Note that Spark SQL currently uses a very basic SQL parser It would be great to post what is currently supported. --Ben
Re: Spark webUI - application details page
How did you specify the HDFS path? When i put spark.eventLog.dir hdfs:// crosby.research.intel-research.net:54310/tmp/spark-events in my spark-defaults.conf file, I receive the following error: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.io.IOException: Call to crosby.research.intel-research.net/10.212.84.53:54310 failed on local exception: java.io.EOFException -Brad On Thu, Aug 28, 2014 at 12:26 PM, SK skrishna...@gmail.com wrote: I was able to recently solve this problem for standalone mode. For this mode, I did not use a history server. Instead, I set spark.eventLog.dir (in conf/spark-defaults.conf) to a directory in hdfs (basically this directory should be in a place that is writable by the master and accessible globally to all the nodes). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
/tmp/spark-events permissions problem
Hi All, Yesterday I restarted my cluster, which had the effect of clearing /tmp. When I brought Spark back up and ran my first job, /tmp/spark-events was re-created and the job ran fine. I later learned that other users were receiving errors when trying to create a spark context. It turned out the reason was that only my user was able to create subdirectories within /tmp/spark-events. I believe /tmp/spark-events originally had ownership bmiller1:bmiller1 (where bmiller1 is my username) with permissions 770. Once I modified the permission to allow other users to create subdirectories other users were again able to launch jobs. Note that I think this may be related to some problems I am having viewing application history (see link). http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-td3490.html#a13130 Has anybody else experienced a problem with permissions on the spark.eventLog.dir directory? best, -Brad
Re: Spark webUI - application details page
Hi All, @Andrew Thanks for the tips. I just built the master branch of Spark last night, but am still having problems viewing history through the standalone UI. I dug into the Spark job events directories as you suggested, and I see at a minimum 'SPARK_VERSION_1.0.0' and 'EVENT_LOG_1'; for applications that call 'sc.stop()' I also see 'APPLICATION_COMPLETE'. The version and application complete files are empty; the event log file contains the information one would need to repopulate the web UI. The follow may be helpful in debugging this: -Each job directory (e.g. '/tmp/spark-events/testhistoryjob-1409246088110') and the files within are owned by the user who ran the job with permissions 770. This prevents the 'spark' user from accessing the contents. -When I make a directory and contents accessible to the spark user, the history server (invoked as 'sbin/start-history-server.sh /tmp/spark-events') is able to display the history, but the standalone web UI still produces the following error: 'No event logs found for application HappyFunTimes in file:///tmp/spark-events/testhistoryjob-1409246088110. Did you specify the correct logging directory?' -Incase it matters, I'm running pyspark. Do you know what may be causing this? When you attempt to reproduce locally, who do you observe owns the files in /tmp/spark-events? best, -Brad On Tue, Aug 26, 2014 at 8:51 AM, SK skrishna...@gmail.com wrote: I have already tried setting the history server and accessing it on master-url:18080 as per the link. But the page does not list any completed applications. As I mentioned in my previous mail, I am running Spark in standalone mode on the cluster (as well as on my local machine). According to the link, it appears that the history server is required only in mesos or yarn mode, not in standalone mode. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12834.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
Hi Andrew, I'm running something close to the present master (I compiled several days ago) but am having some trouble viewing history. I set spark.eventLog.dir to true, but continually receive the error message (via the web UI) Application history not found...No event logs found for application ml-pipeline in file:/tmp/spark-events/ml-pipeline-1408117588599. I tried 2 fixes: -I manually set spark.eventLog.dir to a path beginning with file:///, believe that perhaps the problem was an invalid protocol specification. -I inspected /tmp/spark-events manually and noticed that each job directory (and the files there-in) were owned by the user who launched the job and were not world readable. Since I run Spark from a dedicated Spark user, I set the files world readable but I still receive the same Application history not found error. Is there a configuration step I may be missing? -Brad On Thu, Aug 14, 2014 at 7:33 PM, Andrew Or and...@databricks.com wrote: Hi SK, Not sure if I understand you correctly, but here is how the user normally uses the event logging functionality: After setting spark.eventLog.enabled and optionally spark.eventLog.dir, the user runs his/her Spark application and calls sc.stop() at the end of it. Then he/she goes to the standalone Master UI (under http://master-url:8080 by default) and click on the application under the Completed Applications table. This will link to the Spark UI of the finished application in its completed state, under a path that looks like http://master-url:8080/history/app-Id. It won't be on http://localhost:4040; anymore because the port is now freed for new applications to bind their SparkUIs to. To access the file that stores the raw statistics, go to the file specified in spark.eventLog.dir. This is by default /tmp/spark-events, though in Spark 1.0.1 it may be in HDFS under the same path. I could be misunderstanding what you mean by the stats being buried in the console output, because the events are not logged to the console but to a file in spark.eventLog.dir. For all of this to work, of course, you have to run Spark in standalone mode (i.e. with master set to spark://master-url:7077). In other modes, you will need to use the history server instead. Does this make sense? Andrew 2014-08-14 18:08 GMT-07:00 SK skrishna...@gmail.com: More specifically, as indicated by Patrick above, in 1.0+, apps will have persistent state so that the UI can be reloaded. Is there a way to enable this feature in 1.0.1? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK_DRIVER_MEMORY
Hi All, I have a Spark job for which I need to increase the amount of memory allocated to the driver to collect a large-ish (200M) data structure. Formerly, I accomplished this by setting SPARK_MEM before invoking my job (which effectively set memory on the driver) and then setting spark.executor.memory before creating my spark context. This was a bit awkward since it wasn't clear exactly what SPARK_MEM was meant to do (although in practice it affected only the driver). Since the release of 1.0.0, I've started receiving messages saying to set spark.executor.memory or SPARK_DRIVER_MEMORY. This definitely helps clear things up, but still feels a bit awkward since it seems that most configuration can now be done from within the program (indeed there are very few environment variables now listed on the Spark configuration page). Furthermore, SPARK_DRIVER_MEMORY doesn't seem to appear anywhere in the web documentation. Is there a better way to set SPARK_DRIVER_MEMORY, or some documentation that I'm missing? Is there a guiding principle that would help in figuring out which configuration parameters are set through environment variables and which are set programmatically, or somewhere to look in the source for an exhaustive list of environment variable configuration options? best, -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK_LOCAL_DIRS
Hi All, I'm having some trouble setting the disk spill directory for spark. The following approaches set spark.local.dir (according to the Environment tab of the web UI) but produce the indicated warnings: *In spark-env.sh:* export SPARK_JAVA_OPTS=-Dspark.local.dir=/spark/spill *Associated warning:* 14/08/14 10:10:39 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 14/08/14 10:10:39 WARN SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.local.dir=/spark/spill'). This is deprecated in Spark 1.0+. Please instead use... *In spark-defaults.conf:* spark.local.dir /spark/spill *Associated warning:* 14/08/14 10:09:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). The following does not produce any warnings, but also produces no sign of actually setting spark.local.dir: *In spark-env.sh:* export SPARK_LOCAL_DIRS=/spark/spill Does anybody know whether SPARK_LOCAL_DIRS actually works as advertised, or if I am perhaps using it incorrectly? best, -Brad
trouble with saveAsParquetFile
Hi All, I'm having a bit of trouble with nested data structures in pyspark with saveAsParquetFile. I'm running master (as of yesterday) with this pull request added: https://github.com/apache/spark/pull/1802. *# these all work* sqlCtx.jsonRDD(sc.parallelize(['{record: null}'])).saveAsParquetFile('/tmp/test0') sqlCtx.jsonRDD(sc.parallelize(['{record: []}'])).saveAsParquetFile('/tmp/test1') sqlCtx.jsonRDD(sc.parallelize(['{record: {children: null}}'])).saveAsParquetFile('/tmp/test2') sqlCtx.jsonRDD(sc.parallelize(['{record: {children: []}}'])).saveAsParquetFile('/tmp/test3') sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: foobar}]* }'])).saveAsParquetFile('/tmp/test4') *# this FAILS* sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: null}]* }'])).saveAsParquetFile('/tmp/test5') Py4JJavaError: An error occurred while calling o706.saveAsParquetFile. : java.lang.RuntimeException: *Unsupported datatype NullType* *# this FAILS* sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: []}]* }'])).saveAsParquetFile('/tmp/test6') Py4JJavaError: An error occurred while calling o719.saveAsParquetFile. : java.lang.RuntimeException: *Unsupported datatype NullType* Based on the documentation and the examples that work, it seems like the failing examples are probably meant to be supported features. I was unable to find an open issue for this. Does anybody know if there is an open issue, or whether an issue should be created? best, -Brad
Re: trouble with saveAsParquetFile
Thanks Yin! best, -Brad On Thu, Aug 7, 2014 at 1:39 PM, Yin Huai yh...@databricks.com wrote: Hi Brad, It is a bug. I have filed https://issues.apache.org/jira/browse/SPARK-2908 to track it. It will be fixed soon. Thanks, Yin On Thu, Aug 7, 2014 at 10:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm having a bit of trouble with nested data structures in pyspark with saveAsParquetFile. I'm running master (as of yesterday) with this pull request added: https://github.com/apache/spark/pull/1802. *# these all work* sqlCtx.jsonRDD(sc.parallelize(['{record: null}'])).saveAsParquetFile('/tmp/test0') sqlCtx.jsonRDD(sc.parallelize(['{record: []}'])).saveAsParquetFile('/tmp/test1') sqlCtx.jsonRDD(sc.parallelize(['{record: {children: null}}'])).saveAsParquetFile('/tmp/test2') sqlCtx.jsonRDD(sc.parallelize(['{record: {children: []}}'])).saveAsParquetFile('/tmp/test3') sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: foobar}]* }'])).saveAsParquetFile('/tmp/test4') *# this FAILS* sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: null}]* }'])).saveAsParquetFile('/tmp/test5') Py4JJavaError: An error occurred while calling o706.saveAsParquetFile. : java.lang.RuntimeException: *Unsupported datatype NullType* *# this FAILS* sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: []}]* }'])).saveAsParquetFile('/tmp/test6') Py4JJavaError: An error occurred while calling o719.saveAsParquetFile. : java.lang.RuntimeException: *Unsupported datatype NullType* Based on the documentation and the examples that work, it seems like the failing examples are probably meant to be supported features. I was unable to find an open issue for this. Does anybody know if there is an open issue, or whether an issue should be created? best, -Brad
pyspark inferSchema
Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: pyspark inferSchema
Hi Nick, Thanks for the great response. I actually already investigated jsonRDD and jsonFile, although I did not realize they provide more complete schema inference. I did however have other problems with jsonRDD and jsonFile, but I will now describe in a separate thread with an appropriate subject. I did notice that when I run your example code, I do not receive the exact same output. For example, I see: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType Notice the difference in the schema. Are you running the 1.0.1 release, or a more bleeding-edge version from the repository? best, -Brad On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated https://issues.apache.org/jira/browse/SPARK-2010 in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
trouble with jsonRDD and jsonFile in pyspark
Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at
Re: pyspark inferSchema
Got it. Thanks! On Tue, Aug 5, 2014 at 11:53 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Notice the difference in the schema. Are you running the 1.0.1 release, or a more bleeding-edge version from the repository? Yep, my bad. I’m running off master at commit 184048f80b6fa160c89d5bb47b937a0a89534a95. Nick
Re: trouble with jsonRDD and jsonFile in pyspark
Nick: Thanks for both the original JIRA bug report and the link. Michael: This is on the 1.0.1 release. I'll update to master and follow-up if I have any problems. best, -Brad On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com wrote: Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028
Re: pyspark inferSchema
Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: pyspark inferSchema
Assuming updating to master fixes the bug I was experiencing with jsonRDD and jsonFile, then pushing sample to master will probably not be necessary. We believe that the link below was the bug I experienced, and I've been told it is fixed in master. https://issues.apache.org/jira/browse/SPARK-2376 best, -brad On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote: This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: trouble with jsonRDD and jsonFile in pyspark
Hi All, I've built and deployed the current head of branch-1.0, but it seems to have only partly fixed the bug. This code now runs as expected with the indicated output: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}', '{foo:[4,5,6]}'])) srdd.printSchema() root |-- foo: ArrayType[IntegerType] srdd.collect() [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}] This code still crashes: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) srdd.printSchema() root |-- foo: ArrayType[ArrayType(IntegerType)] srdd.collect() Py4JJavaError: An error occurred while calling o63.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on host kunitz.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I may be able to see if this is fixed in master, but since it's not fixed in 1.0.3 it seems unlikely to be fixed in master either. I previously tried master as well, but ran into a build problem that did not occur with the 1.0 branch. Can anybody else verify that the second example still crashes (and is meant to work)? If so, would it be best to modify JIRA-2376 or start a new bug? https://issues.apache.org/jira/browse/SPARK-2376 best, -Brad On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Nick: Thanks for both the original JIRA bug report and the link. Michael: This is on the 1.0.1 release. I'll update to master and follow-up if I have any problems. best, -Brad On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com wrote: Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322
Re: trouble with jsonRDD and jsonFile in pyspark
Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. *# dictionary as value works fine* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] *# dictionary as value works fine, even when inner keys are varied* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] *# dictionary as value works fine when inner keys are missing and outer key is present* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] *# dictionary as value FAILS when outer key is missing* * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... *# dictionary as value FAILS when outer key is present with null value* * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... *# nested lists work even when outer key is missing* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ]) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).printSchema() root |-- foo: array (nullable = true) ||-- element: array (containsNull = false) |||-- element: integer (containsNull = false) Nick On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I've built and deployed the current head of branch-1.0, but it seems to have only partly fixed the bug. This code now runs as expected with the indicated output: srdd
Re: pyspark inferSchema
I've followed up in a thread more directly related to jsonRDD and jsonFile, but it seems like after building from the current master I'm still having some problems with nested dictionaries. http://apache-spark-user-list.1001560.n3.nabble.com/trouble-with-jsonRDD-and-jsonFile-in-pyspark-tp11461p11517.html On Tue, Aug 5, 2014 at 12:56 PM, Yin Huai yh...@databricks.com wrote: Yes, 2376 has been fixed in master. Can you give it a try? Also, for inferSchema, because Python is dynamically typed, I agree with Davies to provide a way to scan a subset (or entire) of the dataset to figure out the proper schema. We will take a look it. Thanks, Yin On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Assuming updating to master fixes the bug I was experiencing with jsonRDD and jsonFile, then pushing sample to master will probably not be necessary. We believe that the link below was the bug I experienced, and I've been told it is fixed in master. https://issues.apache.org/jira/browse/SPARK-2376 best, -brad On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote: This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema
Re: trouble with jsonRDD and jsonFile in pyspark
I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote: I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. *# dictionary as value works fine* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] *# dictionary as value works fine, even when inner keys are varied* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] *# dictionary as value works fine when inner keys are missing and outer key is present* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] *# dictionary as value FAILS when outer key is missing* * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... *# dictionary as value FAILS when outer key is present with null value* * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... *# nested lists work even when outer key is missing* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ]) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6
Re: Announcing Spark 1.0.1
Hi All, Congrats to the entire Spark team on the 1.0.1 release. In checking out the new features, I noticed that it looks like the python API docs have been updated, but the title and the header at the top of the page still say Spark 1.0.0. Clearly not a big deal... I just wouldn't want anyone to get confused and miss out. http://spark.apache.org/docs/1.0.1/api/python/index.html best, -Brad On Fri, Jul 11, 2014 at 8:44 PM, Henry Saputra henry.sapu...@gmail.com wrote: Congrats to the Spark community ! On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
odd caching behavior or accounting
Hi All, I am resending this message because I suspect the original may have been blocked from the mailing list due to attachments. Note that the mail does appear on the apache archives http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3CCANR-kKeO3mxL1QuX0fnz0DEPkU4FFbXO2W_5CdmtrzYKUfhaBg%40mail.gmail.com%3E but not on nabble, the online archive linked from the Spark website http://apache-spark-user-list.1001560.n3.nabble.com/. The text of the original message appears below; the PDF http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/2 and PNG http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/3 files original attached are now available as linked from the apache archive. best, -Brad -- Forwarded message -- From: Brad Miller bmill...@eecs.berkeley.edu Date: Mon, Jun 30, 2014 at 10:20 AM Subject: odd caching behavior or accounting To: user@spark.apache.org Hi All, I've recently noticed some caching behavior which I did not understand and may or may not have indicated a bug. In short, the web UI seemed to indicate that some blocks were being added to the cache despite already being in cache. As documentation, I have attached two UI screenshots. The PNG captures enough of the screen to demonstrate the problem; the PDF is the printout of the full page. Notice that: -block rdd_21_1001 is in the cache twice, both times on letang.research.intel-research.net; many other blocks also occur twice on a variety of hosts. I've not confirmed that the duplicate block is *always* the same host but it seems to appear that way. -the stated storage level is Memory Deserialized 1x Replicated -the top left states that the cached partitions and total partitions are 4000, but in the table where partitions are enumerated there are 4534. Although not reflected in this screenshot, I believe I have seen this behavior occur even when double caching of blocks causes eviction of blocks from other RDDs. I am running the Spark 1.0.0 release and using pyspark. best, -Brad
pyspark bug with unittest and scikit-learn
Hi All, I am attempting to develop some unit tests for a program using pyspark and scikit-learn and I've come across some weird behavior. I receive the following warning during some tests python/pyspark/serializers.py:327: DeprecationWarning: integer argument expected, got float. Although it's only a warning, and my test still passes (i.e. Spark still seems to work), it would be nice to know why it's happening and if it actually indicates a problem since this can probably happen outside unit testing as well. Note that the warning occurs when I invoke the test as SPARK_HOME=/home/spark/spark-1.0.0-bin-hadoop1 PYTHONPATH=/home/spark/spark-1.0.0-bin-hadoop1/python python -m unittest -v -b crash_test. Doing any one of the following three things causes the warning to go away: -invoking as python crash_test.py rather than python -m unittest -v -b crash_test -commenting out import sklearn.metrics -changing lambda x: foo(x) to lambda x: x Note that I am running the following software: Spark 1.0.0 Python 2.7.3 scikit-learn 0.14.1 Ubuntu 12.04 *Exact Warning (actually occurs 3 times):* /home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327: DeprecationWarning: integer argument expected, got float stream.write(struct.pack(!q, value)) /home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327: DeprecationWarning: integer argument expected, got float stream.write(struct.pack(!q, value)) /home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327: DeprecationWarning: integer argument expected, got float stream.write(struct.pack(!q, value)) *crash_test.py:* import unittest from pyspark import SparkContext import sklearn.metrics def foo(x): return x def setUpModule(): global sc sc = SparkContext('local') print sc.parallelize(range(4)).map(lambda x: foo(x)).collect() class CrashTest(unittest.TestCase): def test(self): pass if __name__ == '__main__': unittest.main() I'm glad to know if anybody else has experienced a similar problem, or has insight into what may be happening or if it is significant. best, -Brad
pyspark join crash
Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: pyspark join crash
Hi Matei, Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task. The workers seem to load all data from each block containing a record needed by the reduce task. I base this hypothesis on the following: -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled -There are 130K records -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) -There are 3 cores per node (each running one reduce task at a time) -Each node has 32G of memory Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled. Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average. The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines). Indeed, I observe a large memory spike at each node. When I attempt the join with 2000 output partitions, it succeeds. Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself). Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average. I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks). If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065). -Brad On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote: In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: Spark - ready for prime time?
I would echo much of what Andrew has said. I manage a small/medium sized cluster (48 cores, 512G ram, 512G disk space dedicated to spark, data storage in separate HDFS shares). I've been using spark since 0.7, and as with Andrew I've observed significant and consistent improvements in stability (and in the PySpark API) since then. I have run into some trouble with mesos, and I have run into some trouble when working with data which is large relative to the size of my cluster (e.g. 100G), but overall it's worked well and our group is continuing to build on top of spark. On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com wrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice. You'll want your customizations to go with (not against) the grain of the architecture. II. Is it mature enough? E.g. we've created a pull request which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable? There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity. For instance, we put all the jars on the main classpath so we've never run into the issue your pull request addresses. We definitely use and rely on Spark on a professional basis. We have 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data. Once we got them working with the proper configuration settings, they have been running reliability since. I would characterize our use of Spark as a better Hadoop, in the sense that we use it for batch processing only, no streaming yet. We're happy it performs better than Hadoop but we don't require/rely on its memory caching features. In fact, for most of our jobs it would simplify our lives if Spark wouldn't cache so many things in memory since it would make configuration/tuning a lot simpler and jobs would run successfully on the first try instead of having to tweak things (# of partitions and such). So, to the concrete issues. Sorry for the long mail, and let me know if I should break this out into more threads or if there is some other way to have this discussion... 1. Memory management The general direction of these questions is whether it's possible to take RDD caching related memory management more into our own hands as LRU eviction is nice most of the time but can be very suboptimal in some of our use cases. A. Somehow prioritize cached RDDs, E.g. mark some essential that one really wants to keep. I'm fine with going down in flames if I mark too much data essential. B. Memory reflection: can you pragmatically get the memory size of a cached rdd and memory sizes available in total/per executor? If we could do this we could indirectly avoid automatic evictions of things we might really want to keep in memory. C. Evictions caused by RDD partitions on the driver. I had a setup with huge worker memory and smallish memory on the driver JVM. To my surprise, the system started to cache RDD partitions on the driver as well. As the driver ran out of memory I
Re: Spark - ready for prime time?
4. Shuffle on disk Is it true - I couldn't find it in official docs, but did see this mentioned in various threads - that shuffle _always_ hits disk? (Disregarding OS caches.) Why is this the case? Are you planning to add a function to do shuffle in memory or are there some intrinsic reasons for this to be impossible? I don't think it's true... as far as I'm concerned Spark doesn't peek into the OS and force it to disregard buffer caches. In general, for large shuffles, all shuffle files do not fit into memory, so we kind of have to write them out to disk. There is an undocumented option to sync writing shuffle files to disk every time we write a block, but that is by default false and not many people use it (for obvious reasons). I believe I recently had the experience that for the map portion of the shuffle all shuffle files seemed to be written into the file system (albeit potentially on buffer caches). The size of the shuffle files on hosts matched the size of the shuffle write metric shown in the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be any effort to keep the shuffle files in memory. On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or and...@databricks.com wrote: Here are answers to a subset of your questions: 1. Memory management The general direction of these questions is whether it's possible to take RDD caching related memory management more into our own hands as LRU eviction is nice most of the time but can be very suboptimal in some of our use cases. A. Somehow prioritize cached RDDs, E.g. mark some essential that one really wants to keep. I'm fine with going down in flames if I mark too much data As far as I am aware, there is currently no other eviction policies for RDD blocks other than LRU. Your suggestion of prioritizing RDDs is an interesting one and I'm sure other users would like that as well. B. Memory reflection: can you pragmatically get the memory size of a cached rdd and memory sizes available in total/per executor? If we could do this we could indirectly avoid automatic evictions of things we might really want to keep in memory. All this information should be displayed on the UI under the Storage tab. C. Evictions caused by RDD partitions on the driver. I had a setup with huge worker memory and smallish memory on the driver JVM. To my surprise, the system started to cache RDD partitions on the driver as well. As the driver ran out of memory I started to see evictions while there were still plenty of space on workers. This resulted in lengthy recomputations. Can this be avoided somehow? The amount of space used for RDD storage is only a fraction of the total amount of memory available to the JVM. More specifically, it is governed by `spark.storage.memoryFraction`, which is by default 60%. This may explain why evictions seem to occur pre-maturely sometimes. In the future, we should probably add a table that contains information about evicted RDDs on the UI, so it's easier to track them. Right now evicted RDD's disappear from the face of the planet completely, sometimes leaving the user somewhat confounded. Though with off-heap storage (Tachyon) this may become less relevant. D. Broadcasts. Is it possible to get rid of a broadcast manually, without waiting for the LRU eviction taking care of it? Can you tell the size of a broadcast programmatically? In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is explicitly added! Under the storage tab of the UI, we could probably also have a Broadcast table in the future, seeing that there are users interested in this feature. 3. Recalculation of cached rdds I see the following scenario happening. I load two RDDs A,B from disk, cache them and then do some jobs on them, at the very least a count on each. After these jobs are done I see on the storage panel that 100% of these RDDs are cached in memory. Then I create a third RDD C which is created by multiple joins and maps from A and B, also cache it and start a job on C. When I do this I still see A and B completely cached and also see C slowly getting more and more cached. This is all fine and good, but in the meanwhile I see stages running on the UI that point to code which is used to load A and B. How is this possible? Am I misunderstanding how cached RDDs should behave? And again the general question - how can one debug such issues? From the fractions of RDDs cached in memory, it seems to me that your application is running as expected. If you also cache C, then it will slowly add more blocks to storage, possibly evicting A and B if there is memory pressure. It's entirely possible that there is a bug on finding the call site on the stages page (there were a few PRs that made changes to this recently). 4. Shuffle on disk Is it true - I couldn't find it in official docs, but did see this mentioned in various threads - that shuffle _always_ hits disk? (Disregarding OS
Re: trouble with join on large RDDs
I set SPARK_MEM in the driver process by setting spark.executor.memory to 10G. Each machine had 32G of RAM and a dedicated 32G spill volume. I believe all of the units are in pages, and the page size is the standard 4K. There are 15 slave nodes in the cluster and the sizes of the datasets I'm trying to join are about 2.5G and 25G when serialized and compressed in the RDD cache. I appreciate that Python lacks the type of heap size controls available in Java, but lack any concept of how the different computational tasks are partitioned between Java and Python in pyspark (so it's unclear to me how much freedom python should have to chew through tons of memory). A couple questions which this raises for me are: -Are there any parameters I could tune differently to try and prevent this crashing behavior? -Do we know why this doesn't spill to disk (as Patrick Wendell mentions that shuffle spill is for aggregations which occur during the reduce phase)? -Do we have any hunch about what computation is occurring when the crash occurs? I'd definitely appreciate the insight of others, and am willing to run experiments and send results/errors/logs out. Also, I'm physically located in Soda Hall (Berkeley) so if anyone near by is interested to examine this first hand I am glad to meet up. best, -Brad On Wed, Apr 9, 2014 at 4:21 AM, Andrew Ash and...@andrewash.com wrote: A JVM can easily be limited in how much memory it uses with the -Xmx parameter, but Python doesn't have memory limits built in in such a first-class way. Maybe the memory limits aren't making it to the python executors. What was your SPARK_MEM setting? The JVM below seems to be using 603201 (pages?) and the 3 large python processes each are using ~180 (pages?). I'm unsure the units that the OOM killer's RSS column is in. Could be either pages (4kb each) or bytes. Apr 8 11:19:19 bennett kernel: [86368.978326] [ 2348] 1002 234812573 2102 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978329] [ 2349] 1002 234912573 2101 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978332] [ 2350] 1002 235012573 2101 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978336] [ 5115] 1002 511512571 2101 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978339] [ 5116] 1002 511612571 2101 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978341] [ 5117] 1002 511712571 2101 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978344] [ 7725] 1002 772512570 2098 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978347] [ 7726] 1002 772612570 2098 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978350] [ 7727] 1002 772712570 2098 220 0 python Apr 8 11:19:19 bennett kernel: [86368.978353] [10324] 1002 1032412570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978356] [10325] 1002 1032512570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978359] [10326] 1002 1032612570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978362] [12668] 1002 12668 603201 47932 1900 0 java Apr 8 11:19:19 bennett kernel: [86368.978366] [13295] 1002 1329512570 2100 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978368] [13296] 1002 1329612570 2100 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978371] [13297] 1002 1329712570 2100 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978375] [15192] 1002 1519212570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978377] [15193] 1002 1519312570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978379] [15195] 1002 1519512570 2098 230 0 python Apr 8 11:19:19 bennett kernel: [86368.978381] [15198] 1002 15198 1845471 181846335730 0 python Apr 8 11:19:19 bennett kernel: [86368.978383] [15200] 1002 15200 1710479 168649233160 0 python Apr 8 11:19:19 bennett kernel: [86368.978384] [15201] 1002 15201 1788470 176234434630 0 python Apr 8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process 15198 (python) score 221 or sacrifice child Apr 8 11:19:19 bennett kernel: [86368.978389] Killed process 15198 (python) total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I poked around a bit more to (1) confirm my suspicions that the crash was related to memory consumption
pyspark broadcast error
Hi All, When I run the program shown below, I receive the error shown below. I am running the current version of branch-0.9 from github. Note that I do not receive the error when I replace 2 ** 29 with 2 ** X, where X 29. More interestingly, I do not receive the error when X = 30, and when X 30 the code either crashes with Memory Error or Py4JNetworkError: An error occurred while trying to connect to the Java server. I am aware that there are some bugs (https://spark-project.atlassian.net/browse/SPARK-1065) related to memory consumption with pyspark and broadcasting, but the behavior with X = 29 seemed different and I was wondering if anybody had any insight. -Brad *Program* from pyspark import SparkContext SparkContext.setSystemProperty('spark.executor.memory', '25g') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'FeatureExtraction') meg_512 = range((2 ** 29) / 8) tmp_broad = sc.broadcast(meg_512) *Error* --- Py4JError Traceback (most recent call last) ipython-input-1-db8033dee301 in module() 3 sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'FeatureExtraction') 4 meg_1024 = range((2 ** 29) / 8) 5 tmp_broad = sc.broadcast(meg_1024) /home/spark/spark-branch-0.9/python/pyspark/context.py in broadcast(self, value) 277 pickleSer = PickleSerializer() 278 pickled = pickleSer.dumps(value) -- 279 jbroadcast = self._jsc.broadcast(bytearray(pickled)) 280 return Broadcast(jbroadcast.id(), value, jbroadcast, 281 self._pickled_broadcast_vars) /home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 302 raise Py4JError( 303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'. -- 304 format(target_id, '.', name, value)) 305 else: 306 raise Py4JError( Py4JError: An error occurred while calling o7.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:701)