Re: pyspark - memory leak leading to OOM after submitting 100 jobs?
Well, dumb question: Given the workflow outlined above, should Local Mode keep running? Or is the leak a known issue? I just wanted to check because I can't recall seeing this issue with a non-local master, though it's possible there were task failures that hid the issue. If this issue looks new, what's the easiest way to record memory dumps or do profiling? Can I put something in my spark-defaults.conf ? The code is open source and the run is reproducible, although the specific test currently requires a rather large (but public) dataset. On Sun, Oct 20, 2019 at 6:24 PM Jungtaek Lim wrote: > > Honestly I'd recommend you to spend you time to look into the issue, via > taking memory dump per some interval and compare differences (at least share > these dump files to community with redacting if necessary). Otherwise someone > has to try to reproduce without reproducer and even couldn't reproduce even > they spent their time. Memory leak issue is not really easy to reproduce, > unless it leaks some objects without any conditions. > > - Jungtaek Lim (HeartSaVioR) > > On Sun, Oct 20, 2019 at 7:18 PM Paul Wais wrote: >> >> Dear List, >> >> I've observed some sort of memory leak when using pyspark to run ~100 >> jobs in local mode. Each job is essentially a create RDD -> create DF >> -> write DF sort of flow. The RDD and DFs go out of scope after each >> job completes, hence I call this issue a "memory leak." Here's >> pseudocode: >> >> ``` >> row_rdds = [] >> for i in range(100): >> row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)]) >> row_rdds.append(row_rdd) >> >> for row_rdd in row_rdds: >> df = spark.createDataFrame(row_rdd) >> df.persist() >> print(df.count()) >> df.write.save(...) # Save parquet >> df.unpersist() >> >> # Does not help: >> # del df >> # del row_rdd >> ``` >> >> In my real application: >> * rows are much larger, perhaps 1MB each >> * row_rdds are sized to fit available RAM >> >> I observe that after 100 or so iterations of the second loop (each of >> which creates a "job" in the Spark WebUI), the following happens: >> * pyspark workers have fairly stable resident and virtual RAM usage >> * java process eventually approaches resident RAM cap (8GB standard) >> but virtual RAM usage keeps ballooning. >> >> Eventually the machine runs out of RAM and the linux OOM killer kills >> the java process, resulting in an "IndexError: pop from an empty >> deque" error from py4j/java_gateway.py . >> >> >> Does anybody have any ideas about what's going on? Note that this is >> local mode. I have personally run standalone masters and submitted a >> ton of jobs and never seen something like this over time. Those were >> very different jobs, but perhaps this issue is bespoke to local mode? >> >> Emphasis: I did try to del the pyspark objects and run python GC. >> That didn't help at all. >> >> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image) >> >> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*). >> >> Cheers, >> -Paul >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
pyspark - memory leak leading to OOM after submitting 100 jobs?
Dear List, I've observed some sort of memory leak when using pyspark to run ~100 jobs in local mode. Each job is essentially a create RDD -> create DF -> write DF sort of flow. The RDD and DFs go out of scope after each job completes, hence I call this issue a "memory leak." Here's pseudocode: ``` row_rdds = [] for i in range(100): row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)]) row_rdds.append(row_rdd) for row_rdd in row_rdds: df = spark.createDataFrame(row_rdd) df.persist() print(df.count()) df.write.save(...) # Save parquet df.unpersist() # Does not help: # del df # del row_rdd ``` In my real application: * rows are much larger, perhaps 1MB each * row_rdds are sized to fit available RAM I observe that after 100 or so iterations of the second loop (each of which creates a "job" in the Spark WebUI), the following happens: * pyspark workers have fairly stable resident and virtual RAM usage * java process eventually approaches resident RAM cap (8GB standard) but virtual RAM usage keeps ballooning. Eventually the machine runs out of RAM and the linux OOM killer kills the java process, resulting in an "IndexError: pop from an empty deque" error from py4j/java_gateway.py . Does anybody have any ideas about what's going on? Note that this is local mode. I have personally run standalone masters and submitted a ton of jobs and never seen something like this over time. Those were very different jobs, but perhaps this issue is bespoke to local mode? Emphasis: I did try to del the pyspark objects and run python GC. That didn't help at all. pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image) 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*). Cheers, -Paul - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Avro support broken?
Dear List, Has anybody gotten avro support to work in pyspark? I see multiple reports of it being broken on Stackoverflow and added my own repro to this ticket: https://issues.apache.org/jira/browse/SPARK-27623?focusedCommentId=16878896=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16878896 Cheers, -Paul - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Perf impact of BlockManager byte[] copies
Dear List, I'm investigating some problems related to native code integration with Spark, and while picking through BlockManager I noticed that data (de)serialization currently issues lots of array copies. Specifically: - Deserialization: BlockManager marshals all deserialized bytes through a spark.util. ByteBufferInputStream, which necessitates copying data into an intermediate temporary byte[] . The temporary byte[] might be reused between deserialization of T instances, but nevertheless the bytes must be copied (and likely in a Java loop). - Serialization: BlockManager buffers all serialized bytes into a java.io.ByteArrayOutputStream, which maintains an internal byte[] buffer and grows/re-copies the buffer like a vector as the buffer fills. BlockManager then retrieves the internal byte[] buffer, wraps it in a ByteBuffer, and sends it off to be stored (e.g. in MemoryStore, DiskStore, Tachyon, etc). When an individual T is somewhat large (e.g. a feature vector, an image, etc), or blocks are megabytes in size, these copies become expensive (especially for large written blocks), right? Does anybody have any measurements of /how/ expensive they are? If not, is there serialization benchmark code (e.g. for KryoSerializer ) that might be helpful here? As part of my investigation, I've found that one might be able to sidestep these issues by extending Spark's SerializerInstance API to offer I/O on ByteBuffers (in addition to {Input,Output}Streams). An extension including a ByteBuffer API would furthermore have many benefits for native code. A major downside of this API addition is that it wouldn't interoperate (nontrivially) with compression, so shuffles wouldn't benefit. Nevertheless, BlockManager could probably deduce when use of this ByteBuffer API is possible and leverage it. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support for SQL on unions of tables (merge tables?)
Thanks Cheng! For the list, I talked with Michael Armbrust at a recent Spark meetup and his comments were: * For a union of tables, use a view and the Hive metastore * SQLContext might have the directory-traversing logic I need in it already * The union() of sequence files I saw was slow because Spark was probably trying to shuffle the whole union. A similar Spark SQL join will also be slow (or break) unless one runs statistics so that the smaller table can be broadcasted (e.g. see https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options ) I have never used Hive, so I'll have to investigate further. On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote: I think you can resort to a Hive table partitioned by date https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables On 1/11/15 9:51 PM, Paul Wais wrote: Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 three times slower than spark 1.1, why?
To force one instance per executor, you could explicitly subclass FlatMapFunction and have it lazy-create your parser in the subclass constructor. You might also want to try RDD#mapPartitions() (instead of RDD#flatMap() if you want one instance per partition. This approach worked well for me when I had a flat map function that used non-serializable native code / objects. FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Support for SQL on unions of tables (merge tables?)
Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul
Re: Native / C/C++ code integration
More thoughts. I took a deeper look at BlockManager, RDD, and friends. Suppose one wanted to get native code access to un-deserialized blocks. This task looks very hard. An RDD behaves much like a Scala iterator of deserialized values, and interop with BlockManager is all on deserialized data. One would probably need to rewrite much of RDD, CacheManager, etc in native code; an RDD subclass (e.g. PythonRDD) probably wouldn't work. So exposing raw blocks to native code looks intractable. I wonder how fast Java/Kyro can SerDe of byte arrays. E.g. suppose we have an RDDT where T is immutable and most of the memory for a single T is a byte array. What is the overhead of SerDe-ing T? (Does Java/Kyro copy the underlying memory?) If the overhead is small, then native access to raw blocks wouldn't really yield any advantage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347p18640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Native / C/C++ code integration
Dear List, Has anybody had experience integrating C/C++ code into Spark jobs? I have done some work on this topic using JNA. I wrote a FlatMapFunction that processes all partition entries using a C++ library. This approach works well, but there are some tradeoffs: * Shipping the native dylib with the app jar and loading it at runtime requires a bit of work (on top of normal JNA usage) * Native code doesn't respect the executor heap limits. Under heavy memory pressure, the native code can sometimes ENOMEM sporadically. * While JNA can map Strings, structs, and Java primitive types, the user still needs to deal with more complex objects. E.g. re-serialize protobuf/thrift objects, or provide some other encoding for moving data between Java and C/C++. * C++ static is not thread-safe before C++11, so the user sometimes needs to take care running inside multi-threaded executors * Avoiding memory copies can be a little tricky One other alternative approach comes to mind is pipe(). However, PipedRDD requires copying data over pipes, does not support binary data (?), and native code errors that crash the subprocess don't bubble up to the Spark job as nicely as with JNA. Is there a way to expose raw, in-memory partition/block data to native code? Has anybody else attacked this problem a different way? All the best, -Paul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Do Spark executors restrict native heap vs JVM heap?
Thanks Sean! My novice understanding is that the 'native heap' is the address space not allocated to the JVM heap, but I wanted to check to see if I'm missing something. I found out my issue appeared to be actual memory pressure on the executor machine. There was space for the JVM heap but not much more. On Thu, Oct 30, 2014 at 12:49 PM, Sean Owen so...@cloudera.com javascript:; wrote: No, but, the JVM also does not allocate memory for native code on the heap. I dont think heap has any bearing on whether your native code can't allocate more memory except that of course the heap is also taking memory. On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com javascript:; wrote: Dear Spark List, I have a Spark app that runs native code inside map functions. I've noticed that the native code sometimes sets errno to ENOMEM indicating a lack of available memory. However, I've verified that the /JVM/ has plenty of heap space available-- Runtime.getRuntime().freeMemory() shows gigabytes free and the native code needs only megabytes. Does spark limit the /native/ heap size somehow? Am poking through the executor code now but don't see anything obvious. Best Regards, -Paul Wais - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: Any issues with repartition?
Looks like an OOM issue? Have you tried persisting your RDDs to allow disk writes? I've seen a lot of similar crashes in a Spark app that reads from HDFS and does joins. I.e. I've seen java.io.IOException: Filesystem closed, Executor lost, FetchFailed, etc etc with non-deterministic crashes. I've tried persisting RDDs, tuning other params, and verifying that the Executor JVMs don't come close to their max allocated memory during operation. Looking through user@ tonight, there are a ton of email threads with similar crashes and no answers. It looks like a lot of people are struggling with OOMs. Could one of the Spark committers please comment on this thread, or one of the other unanswered threads with similar crashes? Is this simply how Spark behaves if Executors OOM? What can the user do other than increase memory or reduce RDD size? (And how can one deduce how much of either is needed?) One general workaround for OOMs could be to programmatically break the job input (i.e. from HDFS, input from #parallelize() ) into chunks, and only create/process RDDs related to one chunk at a time. However, this approach has the limitations of Spark Streaming and no formal library support. What might be nice is that if tasks fail, Spark could try to re-partition in order to avoid OOMs. On Fri, Oct 3, 2014 at 2:55 AM, jamborta jambo...@gmail.com wrote: I have two nodes with 96G ram 16 cores, my setup is as follows: conf = (SparkConf() .setMaster(yarn-cluster) .set(spark.executor.memory, 30G) .set(spark.cores.max, 32) .set(spark.executor.instances, 2) .set(spark.executor.cores, 8) .set(spark.akka.timeout, 1) .set(spark.akka.askTimeout, 100) .set(spark.akka.frameSize, 500) .set(spark.cleaner.ttl, 86400) .set(spark.tast.maxFailures, 16) .set(spark.worker.timeout, 150) thanks a lot, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Unable to find proto buffer class error with RDDprotobuf
Derp, one caveat to my solution: I guess Spark doesn't use Kryo for Function serde :( On Fri, Sep 19, 2014 at 12:44 AM, Paul Wais pw...@yelp.com wrote: Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method
Unable to find proto buffer class error with RDDprotobuf
Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
hmm would using kyro help me here? On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote: Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com javascript:; wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com javascript:; wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul
Re: Unable to find proto buffer class error with RDDprotobuf
). I think the root problem might be related to this change: https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42 That change did not appear to touch ParallelCollectionRDD, which I believe is using the root classloader (would explain why --driver-class-path fixes the problem): https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74 If uber.jar is on the classpath, then the root classloader would have the code, hence why --driver-class-path fixes the bug. On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais pw...@yelp.com wrote: hmm would using kyro help me here? On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote: Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Thanks Christian! I tried compiling from source but am still getting the same hadoop client version error when reading from HDFS. Will have to poke deeper... perhaps I've got some classpath issues. FWIW I compiled using: $ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package and hadoop 2.3 / cdh5 from http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote: Hi Paul. I would recommend building your own 1.1.0 distribution. ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange behavior where spark-submit --master yarn-cluster ... will work, but spark-submit --master yarn-client ... will fail. But on the personal build obtained from the command above, both will then work. -Christian On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Hi Sean, Great catch! Yes I was including Spark as a dependency and it was making its way into my uber jar. Following the advice I just found at Stackoverflow[1], I marked Spark as a provided dependency and that appeared to fix my Hadoop client issue. Thanks for your help!!! Perhaps they maintainers might consider setting this in the Quickstart guide pom.xml ( http://spark.apache.org/docs/latest/quick-start.html ) In summary, here's what worked: * Hadoop 2.3 cdh5 http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz * Spark 1.1 for Hadoop 2.3 http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.3.tgz pom.xml snippets: https://gist.github.com/ypwais/ff188611d4806aa05ed9 [1] http://stackoverflow.com/questions/24747037/how-to-define-a-dependency-scope-in-maven-to-include-a-library-in-compile-run Thanks everybody!! -Paul On Tue, Sep 16, 2014 at 3:55 AM, Sean Owen so...@cloudera.com wrote: From the caller / application perspective, you don't care what version of Hadoop Spark is running on on the cluster. The Spark API you compile against is the same. When you spark-submit the app, at runtime, Spark is using the Hadoop libraries from the cluster, which are the right version. So when you build your app, you mark Spark as a 'provided' dependency. Therefore in general, no, you do not build Spark for yourself if you are a Spark app creator. (Of course, your app would care if it were also using Hadoop libraries directly. In that case, you will want to depend on hadoop-client, and the right version for your cluster, but still mark it as provided.) The version Spark is built against only matters when you are deploying Spark's artifacts on the cluster to set it up. Your error suggests there is still a version mismatch. Either you deployed a build that was not compatible, or, maybe you are packaging a version of Spark with your app which is incompatible and interfering. For example, the artifacts you get via Maven depend on Hadoop 1.0.4. I suspect that's what you're doing -- packaging Spark(+Hadoop1.0.4) with your app, when it shouldn't be packaged. Spark works out of the box with just about any modern combo of HDFS and YARN. On Tue, Sep 16, 2014 at 2:28 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1 / cdh4 stuck using old hadoop client?
Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Release date for new pyspark
Thanks all! (And thanks Matei for the developer link!) I was able to build using maven[1] but `./sbt/sbt assembly` results in build errors. (Not familiar enough with the build to know why; in the past sbt worked for me and maven did not). I was able to run the master version of pyspark, which was what I wanted, though I discovered a bug when trying to read spark-pickled data from HDFS. (Looks similar to https://spark-project.atlassian.net/browse/SPARK-1034 from my naive point of view). For the curious: Code: conf = SparkConf() conf.set('spark.local.dir', '/nail/tmp') conf.set('spark.executor.memory', '28g') conf.set('spark.app.name', 'test') sc = SparkContext(conf=conf) sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle) unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle) print unpickled_rdd.takeSample(False, 3) Traceback (most recent call last): File /path/to/my/home/spark-master/tast.py, line 33, in module print unpickled_rdd.takeSample(False, 3) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 391, in takeSample initialCount = self.count() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 703, in reduce vals = self.mapPartitions(func).collect() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 667, in collect bytesInJava = self._jrdd.collect().iterator() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 1600, in _jrdd class_tag) File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 669, in __call__ File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.python.PythonRDD. Trace: py4j.Py4JException: Constructor org.apache.spark.api.python.PythonRDD([class org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap, class java.util.ArrayList, class java.lang.Boolean, class java.lang.String, class java.util.ArrayList, class org.apache.spark.Accumulator, class scala.reflect.ManifestFactory$$anon$2]) does not exist at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184) at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202) at py4j.Gateway.invoke(Gateway.java:213) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:662) [1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust mich...@databricks.com wrote: You should try cleaning and then building. We have recently hit a bug in the scala compiler that sometimes causes non-clean builds to fail. On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, we try to have a regular 3 month release cycle; see https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the current window. Matei On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote: You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Release date for new pyspark
Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais