Re: pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-31 Thread Paul Wais
Well, dumb question:

Given the workflow outlined above, should Local Mode keep running?  Or
is the leak a known issue?  I just wanted to check because I can't
recall seeing this issue with a non-local master, though it's possible
there were task failures that hid the issue.

If this issue looks new, what's the easiest way to record memory dumps
or do profiling?  Can I put something in my spark-defaults.conf ?

The code is open source and the run is reproducible, although the
specific test currently requires a rather large (but public) dataset.

On Sun, Oct 20, 2019 at 6:24 PM Jungtaek Lim
 wrote:
>
> Honestly I'd recommend you to spend you time to look into the issue, via 
> taking memory dump per some interval and compare differences (at least share 
> these dump files to community with redacting if necessary). Otherwise someone 
> has to try to reproduce without reproducer and even couldn't reproduce even 
> they spent their time. Memory leak issue is not really easy to reproduce, 
> unless it leaks some objects without any conditions.
>
> - Jungtaek Lim (HeartSaVioR)
>
> On Sun, Oct 20, 2019 at 7:18 PM Paul Wais  wrote:
>>
>> Dear List,
>>
>> I've observed some sort of memory leak when using pyspark to run ~100
>> jobs in local mode.  Each job is essentially a create RDD -> create DF
>> -> write DF sort of flow.  The RDD and DFs go out of scope after each
>> job completes, hence I call this issue a "memory leak."  Here's
>> pseudocode:
>>
>> ```
>> row_rdds = []
>> for i in range(100):
>>   row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
>>   row_rdds.append(row_rdd)
>>
>> for row_rdd in row_rdds:
>>   df = spark.createDataFrame(row_rdd)
>>   df.persist()
>>   print(df.count())
>>   df.write.save(...) # Save parquet
>>   df.unpersist()
>>
>>   # Does not help:
>>   # del df
>>   # del row_rdd
>> ```
>>
>> In my real application:
>>  * rows are much larger, perhaps 1MB each
>>  * row_rdds are sized to fit available RAM
>>
>> I observe that after 100 or so iterations of the second loop (each of
>> which creates a "job" in the Spark WebUI), the following happens:
>>  * pyspark workers have fairly stable resident and virtual RAM usage
>>  * java process eventually approaches resident RAM cap (8GB standard)
>> but virtual RAM usage keeps ballooning.
>>
>> Eventually the machine runs out of RAM and the linux OOM killer kills
>> the java process, resulting in an "IndexError: pop from an empty
>> deque" error from py4j/java_gateway.py .
>>
>>
>> Does anybody have any ideas about what's going on?  Note that this is
>> local mode.  I have personally run standalone masters and submitted a
>> ton of jobs and never seen something like this over time.  Those were
>> very different jobs, but perhaps this issue is bespoke to local mode?
>>
>> Emphasis: I did try to del the pyspark objects and run python GC.
>> That didn't help at all.
>>
>> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)
>>
>> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).
>>
>> Cheers,
>> -Paul
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-20 Thread Paul Wais
Dear List,

I've observed some sort of memory leak when using pyspark to run ~100
jobs in local mode.  Each job is essentially a create RDD -> create DF
-> write DF sort of flow.  The RDD and DFs go out of scope after each
job completes, hence I call this issue a "memory leak."  Here's
pseudocode:

```
row_rdds = []
for i in range(100):
  row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
  row_rdds.append(row_rdd)

for row_rdd in row_rdds:
  df = spark.createDataFrame(row_rdd)
  df.persist()
  print(df.count())
  df.write.save(...) # Save parquet
  df.unpersist()

  # Does not help:
  # del df
  # del row_rdd
```

In my real application:
 * rows are much larger, perhaps 1MB each
 * row_rdds are sized to fit available RAM

I observe that after 100 or so iterations of the second loop (each of
which creates a "job" in the Spark WebUI), the following happens:
 * pyspark workers have fairly stable resident and virtual RAM usage
 * java process eventually approaches resident RAM cap (8GB standard)
but virtual RAM usage keeps ballooning.

Eventually the machine runs out of RAM and the linux OOM killer kills
the java process, resulting in an "IndexError: pop from an empty
deque" error from py4j/java_gateway.py .


Does anybody have any ideas about what's going on?  Note that this is
local mode.  I have personally run standalone masters and submitted a
ton of jobs and never seen something like this over time.  Those were
very different jobs, but perhaps this issue is bespoke to local mode?

Emphasis: I did try to del the pyspark objects and run python GC.
That didn't help at all.

pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)

12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).

Cheers,
-Paul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Avro support broken?

2019-07-04 Thread Paul Wais
Dear List,

Has anybody gotten avro support to work in pyspark?  I see multiple
reports of it being broken on Stackoverflow and added my own repro to
this ticket: 
https://issues.apache.org/jira/browse/SPARK-27623?focusedCommentId=16878896=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16878896

Cheers,
-Paul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Perf impact of BlockManager byte[] copies

2015-02-27 Thread Paul Wais
Dear List,

I'm investigating some problems related to native code integration
with Spark, and while picking through BlockManager I noticed that data
(de)serialization currently issues lots of array copies.
Specifically:

- Deserialization: BlockManager marshals all deserialized bytes
through a spark.util. ByteBufferInputStream, which necessitates
copying data into an intermediate temporary byte[] .  The temporary
byte[] might be reused between deserialization of T instances, but
nevertheless the bytes must be copied (and likely in a Java loop).

- Serialization: BlockManager buffers all serialized bytes into a
java.io.ByteArrayOutputStream, which maintains an internal byte[]
buffer and grows/re-copies the buffer like a vector as the buffer
fills.  BlockManager then retrieves the internal byte[] buffer, wraps
it in a ByteBuffer, and sends it off to be stored (e.g. in
MemoryStore, DiskStore, Tachyon, etc).

When an individual T is somewhat large (e.g. a feature vector, an
image, etc), or blocks are megabytes in size, these copies become
expensive (especially for large written blocks), right?  Does anybody
have any measurements of /how/ expensive they are?  If not, is there
serialization benchmark code (e.g. for KryoSerializer ) that might be
helpful here?


As part of my investigation, I've found that one might be able to
sidestep these issues by extending Spark's SerializerInstance API to
offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
extension including a ByteBuffer API would furthermore have many
benefits for native code.  A major downside of this API addition is
that it wouldn't interoperate (nontrivially) with compression, so
shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
deduce when use of this ByteBuffer API is possible and leverage it.

Cheers,
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Support for SQL on unions of tables (merge tables?)

2015-01-21 Thread Paul Wais
Thanks Cheng!

For the list, I talked with Michael Armbrust at a recent Spark meetup
and his comments were:
 * For a union of tables, use a view and the Hive metastore
 * SQLContext might have the directory-traversing logic I need in it already
 * The union() of sequence files I saw was slow because Spark was
probably trying to shuffle the whole union.  A similar Spark SQL join
will also be slow (or break) unless one runs statistics so that the
smaller table can be broadcasted (e.g. see
https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
)

I have never used Hive, so I'll have to investigate further.


On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote:
 I think you can resort to a Hive table partitioned by date
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables


 On 1/11/15 9:51 PM, Paul Wais wrote:


 Dear List,

 What are common approaches for addressing over a union of tables / RDDs?
 E.g. suppose I have a collection of log files in HDFS, one log file per day,
 and I want to compute the sum of some field over a date range in SQL.  Using
 log schema, I can read each as a distinct SchemaRDD, but I want to union
 them all and query against one 'table'.

 If this data were in MySQL, I could have a table for each day of data and
 use a MyISAM merge table to union these tables together and just query
 against the merge table.  What's nice here is that MySQL persists the merge
 table, and the merge table is r/w, so one can just update the merge table
 once per day.  (What's not nice is that merge tables scale poorly, backup
 admin is a pain, and oh hey I'd like to use Spark not MySQL).

 One naive and untested idea (that achieves implicit persistence): scan an
 HDFS directory for log files, create one RDD per file, union() the RDDs,
 then create a Schema RDD from that union().

 A few specific questions:
  * Any good approaches to a merge / union table? (Other than the naive
 idea above).  Preferably with some way to persist that table / RDD between
 Spark runs.  (How does Impala approach this problem?)

  * Has anybody tried joining against such a union of tables / RDDs on a
 very large amount of data?  When I've tried (non-spark-sql) union()ing
 Sequence Files, and then join()ing them against another RDD, Spark seems to
 try to compute the full union before doing any join() computation (and
 eventually OOMs the cluster because the union of Sequence Files is so big).
 I haven't tried something similar with Spark SQL.

  * Are there any plans related to this in the Spark roadmap?  (This
 feature would be a nice compliment to, say, persistent RDD indices for
 interactive querying).

  * Related question: are there plans to use Parquet Index Pages to make
 Spark SQL faster?  E.g. log indices over date ranges would be relevant here.

 All the best,
 -Paul




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Paul Wais
To force one instance per executor, you could explicitly subclass
FlatMapFunction and have it lazy-create your parser in the subclass
constructor.  You might also want to try RDD#mapPartitions() (instead of
RDD#flatMap() if you want one instance per partition.  This approach worked
well for me when I had a flat map function that used non-serializable
native code / objects.

FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master
has a slight refactor).  Agree it's worth checking the number of partitions
in your 1.1 vs 1.2 test.



On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out





Support for SQL on unions of tables (merge tables?)

2015-01-11 Thread Paul Wais
Dear List,

What are common approaches for addressing over a union of tables / RDDs?
E.g. suppose I have a collection of log files in HDFS, one log file per
day, and I want to compute the sum of some field over a date range in SQL.
Using log schema, I can read each as a distinct SchemaRDD, but I want to
union them all and query against one 'table'.

If this data were in MySQL, I could have a table for each day of data and
use a MyISAM merge table to union these tables together and just query
against the merge table.  What's nice here is that MySQL persists the merge
table, and the merge table is r/w, so one can just update the merge table
once per day.  (What's not nice is that merge tables scale poorly, backup
admin is a pain, and oh hey I'd like to use Spark not MySQL).

One naive and untested idea (that achieves implicit persistence): scan an
HDFS directory for log files, create one RDD per file, union() the RDDs,
then create a Schema RDD from that union().

A few specific questions:
 * Any good approaches to a merge / union table? (Other than the naive idea
above).  Preferably with some way to persist that table / RDD between Spark
runs.  (How does Impala approach this problem?)

 * Has anybody tried joining against such a union of tables / RDDs on a
very large amount of data?  When I've tried (non-spark-sql) union()ing
Sequence Files, and then join()ing them against another RDD, Spark seems to
try to compute the full union before doing any join() computation (and
eventually OOMs the cluster because the union of Sequence Files is so big).
I haven't tried something similar with Spark SQL.

 * Are there any plans related to this in the Spark roadmap?  (This feature
would be a nice compliment to, say, persistent RDD indices for interactive
querying).

 * Related question: are there plans to use Parquet Index Pages to make
Spark SQL faster?  E.g. log indices over date ranges would be relevant here.

All the best,
-Paul


Re: Native / C/C++ code integration

2014-11-11 Thread Paul Wais
More thoughts.  I took a deeper look at BlockManager, RDD, and friends. 
Suppose one wanted to get native code access to un-deserialized blocks. 
This task looks very hard.  An RDD behaves much like a Scala iterator of
deserialized values, and interop with BlockManager is all on deserialized
data.  One would probably need to rewrite much of RDD, CacheManager, etc in
native code; an RDD subclass (e.g. PythonRDD) probably wouldn't work.

So exposing raw blocks to native code looks intractable.  I wonder how fast
Java/Kyro can SerDe of byte arrays.  E.g. suppose we have an RDDT where T
is immutable and most of the memory for a single T is a byte array.  What is
the overhead of SerDe-ing T?  (Does Java/Kyro copy the underlying memory?) 
If the overhead is small, then native access to raw blocks wouldn't really
yield any advantage.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347p18640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Native / C/C++ code integration

2014-11-07 Thread Paul Wais
Dear List,

Has anybody had experience integrating C/C++ code into Spark jobs?  

I have done some work on this topic using JNA.  I wrote a FlatMapFunction
that processes all partition entries using a C++ library.  This approach
works well, but there are some tradeoffs:
 * Shipping the native dylib with the app jar and loading it at runtime
requires a bit of work (on top of normal JNA usage)
 * Native code doesn't respect the executor heap limits.  Under heavy memory
pressure, the native code can sometimes ENOMEM sporadically.
 * While JNA can map Strings, structs, and Java primitive types, the user
still needs to deal with more complex objects.  E.g. re-serialize
protobuf/thrift objects, or provide some other encoding for moving data
between Java and C/C++.
 * C++ static is not thread-safe before C++11, so the user sometimes needs
to take care running inside multi-threaded executors
 * Avoiding memory copies can be a little tricky

One other alternative approach comes to mind is pipe().  However, PipedRDD
requires copying data over pipes, does not support binary data (?), and
native code errors that crash the subprocess don't bubble up to the Spark
job as nicely as with JNA.

Is there a way to expose raw, in-memory partition/block data to native code?

Has anybody else attacked this problem a different way?

All the best,
-Paul 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Do Spark executors restrict native heap vs JVM heap?

2014-11-02 Thread Paul Wais
Thanks Sean! My novice understanding is that the 'native heap' is the
address space not allocated to the JVM heap, but I wanted to check to see
if I'm missing something.  I found out my issue appeared to be actual
memory pressure on the executor machine.  There was space for the JVM heap
but not much more.

On Thu, Oct 30, 2014 at 12:49 PM, Sean Owen so...@cloudera.com
javascript:; wrote:
 No, but, the JVM also does not allocate memory for native code on the
heap.
 I dont think heap has any bearing on whether your native code can't
allocate
 more memory except that of course the heap is also taking memory.

 On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com javascript:;
wrote:

 Dear Spark List,

 I have a Spark app that runs native code inside map functions.  I've
 noticed that the native code sometimes sets errno to ENOMEM indicating
 a lack of available memory.  However, I've verified that the /JVM/ has
 plenty of heap space available-- Runtime.getRuntime().freeMemory()
 shows gigabytes free and the native code needs only megabytes.  Does
 spark limit the /native/ heap size somehow?  Am poking through the
 executor code now but don't see anything obvious.

 Best Regards,
 -Paul Wais

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org
javascript:;




Re: Any issues with repartition?

2014-10-08 Thread Paul Wais
Looks like an OOM issue?  Have you tried persisting your RDDs to allow
disk writes?

I've seen a lot of similar crashes in a Spark app that reads from HDFS
and does joins.  I.e. I've seen java.io.IOException: Filesystem
closed, Executor lost, FetchFailed, etc etc with
non-deterministic crashes.  I've tried persisting RDDs, tuning other
params, and verifying that the Executor JVMs don't come close to their
max allocated memory during operation.

Looking through user@ tonight, there are a ton of email threads with
similar crashes and no answers.  It looks like a lot of people are
struggling with OOMs.

Could one of the Spark committers please comment on this thread, or
one of the other unanswered threads with similar crashes?  Is this
simply how Spark behaves if Executors OOM?  What can the user do other
than increase memory or reduce RDD size?  (And how can one deduce how
much of either is needed?)

One general workaround for OOMs could be to programmatically break the
job input (i.e. from HDFS, input from #parallelize() ) into chunks,
and only create/process RDDs related to one chunk at a time.  However,
this approach has the limitations of Spark Streaming and no formal
library support.  What might be nice is that if tasks fail, Spark
could try to re-partition in order to avoid OOMs.



On Fri, Oct 3, 2014 at 2:55 AM, jamborta jambo...@gmail.com wrote:
 I have two nodes with 96G ram 16 cores, my setup is as follows:

 conf = (SparkConf()
 .setMaster(yarn-cluster)
 .set(spark.executor.memory, 30G)
 .set(spark.cores.max, 32)
 .set(spark.executor.instances, 2)
 .set(spark.executor.cores, 8)
 .set(spark.akka.timeout, 1)
 .set(spark.akka.askTimeout, 100)
 .set(spark.akka.frameSize, 500)
 .set(spark.cleaner.ttl, 86400)
 .set(spark.tast.maxFailures, 16)
 .set(spark.worker.timeout, 150)

 thanks a lot,




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-19 Thread Paul Wais
Well it looks like this is indeed a protobuf issue.  Poked a little more
with Kryo.  Since protobuf messages are serializable, I tried just making
Kryo use the JavaSerializer for my messages.  The resulting stack trace
made it look like protobuf GeneratedMessageLite is actually using the
classloader that loaded it, which I believe would be the root loader?

 *
https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529
 * See note:
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220

So I guess protobuf java serialization is sensitive to the class loader.  I
wonder if Kenton ever saw this one coming :)  I do have a solution, though
(see way below)


Here's the code and stack trace:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(myapp);
sparkConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer);
sparkConf.set(spark.kryo.registrator, MyKryoRegistrator);

...

public class MyKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(MyProtoMessage.class, new JavaSerializer());
}
}

...

14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2)
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)
at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
at
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at

Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-19 Thread Paul Wais
Derp, one caveat to my solution:  I guess Spark doesn't use Kryo for
Function serde :(

On Fri, Sep 19, 2014 at 12:44 AM, Paul Wais pw...@yelp.com wrote:
 Well it looks like this is indeed a protobuf issue.  Poked a little more
 with Kryo.  Since protobuf messages are serializable, I tried just making
 Kryo use the JavaSerializer for my messages.  The resulting stack trace made
 it look like protobuf GeneratedMessageLite is actually using the classloader
 that loaded it, which I believe would be the root loader?

  *
 https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775
  *
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186
  *
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529
  * See note:
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220

 So I guess protobuf java serialization is sensitive to the class loader.  I
 wonder if Kenton ever saw this one coming :)  I do have a solution, though
 (see way below)


 Here's the code and stack trace:

 SparkConf sparkConf = new SparkConf();
 sparkConf.setAppName(myapp);
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);
 sparkConf.set(spark.kryo.registrator, MyKryoRegistrator);

 ...

 public class MyKryoRegistrator implements KryoRegistrator {
 public void registerClasses(Kryo kryo) {
 kryo.register(MyProtoMessage.class, new JavaSerializer());
 }
 }

 ...

 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2)
 com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
 at
 com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)
 at
 com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
 at
 org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
 at
 org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
 at
 org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
 at
 org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.RuntimeException: Unable to find proto buffer class
 at
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method

Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Dear List,

I'm writing an application where I have RDDs of protobuf messages.
When I run the app via bin/spar-submit with --master local
--driver-class-path path/to/my/uber.jar, Spark is able to
ser/deserialize the messages correctly.

However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
try --master spark://my.master:7077 , then I run into errors that make
it look like my protobuf message classes are not on the classpath:

Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
java.lang.RuntimeException: Unable to find proto buffer class

com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
...

Why do I need --driver-class-path in the local scenario?  And how can
I ensure my classes are on the classpath no matter how my app is
submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
?  I've tried poking through the shell scripts and SparkSubmit.scala
and unfortunately I haven't been able to grok exactly what Spark is
doing with the remote/local JVMs.

Cheers,
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Well, it looks like Spark is just not loading my code into the
driver/executors E.g.:

ListString foo = JavaRDDMyMessage bars.map(
new Function MyMessage, String() {

{
System.err.println(classpath:  +
System.getProperty(java.class.path));

CodeSource src =
com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
if (src2 != null) {
   URL jar = src2.getLocation();
   System.err.println(aaacom.google.protobuf.GeneratedMessageLite
from jar:  + jar.toString());
}

@Override
public String call(MyMessage v1) throws Exception {
return v1.getString();
}
}).collect();

prints:
classpath: 
::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
com.google.protobuf.GeneratedMessageLite from jar:
file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar

I do see after those lines:
14/09/18 23:28:09 INFO Executor: Adding
file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
loader


This is with:

spart-submit --master local --class MyClass --jars uber.jar  uber.jar


My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
should use protobuf 2.5[1] and even shade it properly.  I read claims
in this list that Spark shades protobuf correctly since 0.9.? and
looking thru the pom.xml on github it looks like Spark includes
protobuf 2.5 in the hadoop 2.3 profile.


I guess I'm still at What's the deal with getting Spark to distribute
and load code from my jar correctly?


[1] 
http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml

On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm writing an application where I have RDDs of protobuf messages.
 When I run the app via bin/spar-submit with --master local
 --driver-class-path path/to/my/uber.jar, Spark is able to
 ser/deserialize the messages correctly.

 However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
 try --master spark://my.master:7077 , then I run into errors that make
 it look like my protobuf message classes are not on the classpath:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
 java.lang.RuntimeException: Unable to find proto buffer class
 
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
 ...

 Why do I need --driver-class-path in the local scenario?  And how can
 I ensure my classes are on the classpath no matter how my app is
 submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
 ?  I've tried poking through the shell scripts and SparkSubmit.scala
 and unfortunately I haven't been able to grok exactly what Spark is
 doing with the remote/local JVMs.

 Cheers,
 -Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
looks like I might be getting bitten by the same
java.io.ObjectInputStream uses root class loader only bugs mentioned
in:

* 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
* https://github.com/apache/spark/pull/181

* 
http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
* https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote:
 Well, it looks like Spark is just not loading my code into the
 driver/executors E.g.:

 ListString foo = JavaRDDMyMessage bars.map(
 new Function MyMessage, String() {

 {
 System.err.println(classpath:  +
 System.getProperty(java.class.path));

 CodeSource src =
 com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
 if (src2 != null) {
URL jar = src2.getLocation();
System.err.println(aaacom.google.protobuf.GeneratedMessageLite
 from jar:  + jar.toString());
 }

 @Override
 public String call(MyMessage v1) throws Exception {
 return v1.getString();
 }
 }).collect();

 prints:
 classpath: 
 ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
 com.google.protobuf.GeneratedMessageLite from jar:
 file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar

 I do see after those lines:
 14/09/18 23:28:09 INFO Executor: Adding
 file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
 loader


 This is with:

 spart-submit --master local --class MyClass --jars uber.jar  uber.jar


 My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
 come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
 should use protobuf 2.5[1] and even shade it properly.  I read claims
 in this list that Spark shades protobuf correctly since 0.9.? and
 looking thru the pom.xml on github it looks like Spark includes
 protobuf 2.5 in the hadoop 2.3 profile.


 I guess I'm still at What's the deal with getting Spark to distribute
 and load code from my jar correctly?


 [1] 
 http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml

 On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm writing an application where I have RDDs of protobuf messages.
 When I run the app via bin/spar-submit with --master local
 --driver-class-path path/to/my/uber.jar, Spark is able to
 ser/deserialize the messages correctly.

 However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
 try --master spark://my.master:7077 , then I run into errors that make
 it look like my protobuf message classes are not on the classpath:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
 java.lang.RuntimeException: Unable to find proto buffer class
 
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
 ...

 Why do I need --driver-class-path in the local scenario?  And how can
 I ensure my classes are on the classpath no matter how my app is
 submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
 ?  I've tried poking through the shell scripts and SparkSubmit.scala
 and unfortunately I haven't been able to grok exactly what Spark is
 doing with the remote/local JVMs.

 Cheers,
 -Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
hmm would using kyro help me here?

On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote:

 Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
 looks like I might be getting bitten by the same
 java.io.ObjectInputStream uses root class loader only bugs mentioned
 in:

 *
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
 * https://github.com/apache/spark/pull/181

 *
 http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
 * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




 On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com javascript:;
 wrote:
  Well, it looks like Spark is just not loading my code into the
  driver/executors E.g.:
 
  ListString foo = JavaRDDMyMessage bars.map(
  new Function MyMessage, String() {
 
  {
  System.err.println(classpath:  +
  System.getProperty(java.class.path));
 
  CodeSource src =
 
 com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
  if (src2 != null) {
 URL jar = src2.getLocation();
 
 System.err.println(aaacom.google.protobuf.GeneratedMessageLite
  from jar:  + jar.toString());
  }
 
  @Override
  public String call(MyMessage v1) throws Exception {
  return v1.getString();
  }
  }).collect();
 
  prints:
  classpath:
 ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
  com.google.protobuf.GeneratedMessageLite from jar:
  file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
 
  I do see after those lines:
  14/09/18 23:28:09 INFO Executor: Adding
  file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
  loader
 
 
  This is with:
 
  spart-submit --master local --class MyClass --jars uber.jar  uber.jar
 
 
  My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
  come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
  should use protobuf 2.5[1] and even shade it properly.  I read claims
  in this list that Spark shades protobuf correctly since 0.9.? and
  looking thru the pom.xml on github it looks like Spark includes
  protobuf 2.5 in the hadoop 2.3 profile.
 
 
  I guess I'm still at What's the deal with getting Spark to distribute
  and load code from my jar correctly?
 
 
  [1]
 http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml
 
  On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com
 javascript:; wrote:
  Dear List,
 
  I'm writing an application where I have RDDs of protobuf messages.
  When I run the app via bin/spar-submit with --master local
  --driver-class-path path/to/my/uber.jar, Spark is able to
  ser/deserialize the messages correctly.
 
  However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
  try --master spark://my.master:7077 , then I run into errors that make
  it look like my protobuf message classes are not on the classpath:
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
  recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
  java.lang.RuntimeException: Unable to find proto buffer class
 
  
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
  
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:606)
 
  java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
  ...
 
  Why do I need --driver-class-path in the local scenario?  And how can
  I ensure my classes are on the classpath no matter how my app is
  submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
  ?  I've tried poking through the shell scripts and SparkSubmit.scala
  and unfortunately I haven't been able to grok exactly what Spark is
  doing with the remote/local JVMs.
 
  Cheers,
  -Paul



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
).


I think the root problem might be related to this change:
https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42

That change did not appear to touch ParallelCollectionRDD, which I
believe is using the root classloader (would explain why
--driver-class-path fixes the problem):
https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74

If uber.jar is on the classpath, then the root classloader would have
the code, hence why --driver-class-path fixes the bug.




On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais pw...@yelp.com wrote:
 hmm would using kyro help me here?


 On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote:

 Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
 looks like I might be getting bitten by the same
 java.io.ObjectInputStream uses root class loader only bugs mentioned
 in:

 *
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
 * https://github.com/apache/spark/pull/181

 *
 http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
 * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




 On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote:
  Well, it looks like Spark is just not loading my code into the
  driver/executors E.g.:
 
  ListString foo = JavaRDDMyMessage bars.map(
  new Function MyMessage, String() {
 
  {
  System.err.println(classpath:  +
  System.getProperty(java.class.path));
 
  CodeSource src =
 
  com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
  if (src2 != null) {
 URL jar = src2.getLocation();
 
  System.err.println(aaacom.google.protobuf.GeneratedMessageLite
  from jar:  + jar.toString());
  }
 
  @Override
  public String call(MyMessage v1) throws Exception {
  return v1.getString();
  }
  }).collect();
 
  prints:
  classpath:
  ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
  com.google.protobuf.GeneratedMessageLite from jar:
  file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
 
  I do see after those lines:
  14/09/18 23:28:09 INFO Executor: Adding
  file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
  loader
 
 
  This is with:
 
  spart-submit --master local --class MyClass --jars uber.jar  uber.jar
 
 
  My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
  come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
  should use protobuf 2.5[1] and even shade it properly.  I read claims
  in this list that Spark shades protobuf correctly since 0.9.? and
  looking thru the pom.xml on github it looks like Spark includes
  protobuf 2.5 in the hadoop 2.3 profile.
 
 
  I guess I'm still at What's the deal with getting Spark to distribute
  and load code from my jar correctly?
 
 
  [1]
  http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml
 
  On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
  Dear List,
 
  I'm writing an application where I have RDDs of protobuf messages.
  When I run the app via bin/spar-submit with --master local
  --driver-class-path path/to/my/uber.jar, Spark is able to
  ser/deserialize the messages correctly.
 
  However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
  try --master spark://my.master:7077 , then I run into errors that make
  it look like my protobuf message classes are not on the classpath:
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
  recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
  java.lang.RuntimeException: Unable to find proto buffer class
 
  com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:606)
 
  java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
  ...
 
  Why do I need --driver-class-path in the local scenario?  And how can
  I ensure my classes are on the classpath no matter how my app is
  submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
  ?  I've tried poking through the shell scripts and SparkSubmit.scala

Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-16 Thread Paul Wais
Thanks Christian!  I tried compiling from source but am still getting the
same hadoop client version error when reading from HDFS.  Will have to poke
deeper... perhaps I've got some classpath issues.  FWIW I compiled using:

$ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package

and hadoop 2.3 / cdh5 from
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz





On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote:

 Hi Paul.

 I would recommend building your own 1.1.0 distribution.

 ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn
 -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests



 I downloaded the Pre-build for Hadoop 2.4 binary, and it had this
 strange behavior where

 spark-submit --master yarn-cluster ...

 will work, but

 spark-submit --master yarn-client ...

 will fail.


 But on the personal build obtained from the command above, both will then
 work.


 -Christian




 On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,

 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:

 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...

 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:

 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )


 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).


 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

 Thanks for any help anybody can give me here!
 -Paul

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-16 Thread Paul Wais
Hi Sean,

Great catch! Yes I was including Spark as a dependency and it was
making its way into my uber jar.  Following the advice I just found at
Stackoverflow[1],  I marked Spark as a provided dependency and that
appeared to fix my Hadoop client issue.  Thanks for your help!!!
Perhaps they maintainers might consider setting this in the Quickstart
guide pom.xml ( http://spark.apache.org/docs/latest/quick-start.html )

In summary, here's what worked:
 * Hadoop 2.3 cdh5
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz
 * Spark 1.1 for Hadoop 2.3
http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.3.tgz

pom.xml snippets: https://gist.github.com/ypwais/ff188611d4806aa05ed9

[1] 
http://stackoverflow.com/questions/24747037/how-to-define-a-dependency-scope-in-maven-to-include-a-library-in-compile-run

Thanks everybody!!
-Paul


On Tue, Sep 16, 2014 at 3:55 AM, Sean Owen so...@cloudera.com wrote:
 From the caller / application perspective, you don't care what version
 of Hadoop Spark is running on on the cluster. The Spark API you
 compile against is the same. When you spark-submit the app, at
 runtime, Spark is using the Hadoop libraries from the cluster, which
 are the right version.

 So when you build your app, you mark Spark as a 'provided' dependency.
 Therefore in general, no, you do not build Spark for yourself if you
 are a Spark app creator.

 (Of course, your app would care if it were also using Hadoop libraries
 directly. In that case, you will want to depend on hadoop-client, and
 the right version for your cluster, but still mark it as provided.)

 The version Spark is built against only matters when you are deploying
 Spark's artifacts on the cluster to set it up.

 Your error suggests there is still a version mismatch. Either you
 deployed a build that was not compatible, or, maybe you are packaging
 a version of Spark with your app which is incompatible and
 interfering.

 For example, the artifacts you get via Maven depend on Hadoop 1.0.4. I
 suspect that's what you're doing -- packaging Spark(+Hadoop1.0.4) with
 your app, when it shouldn't be packaged.

 Spark works out of the box with just about any modern combo of HDFS and YARN.

 On Tue, Sep 16, 2014 at 2:28 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:

 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 ...

 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:
 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )


 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).


 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
  * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
  * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

 Thanks for any help anybody can give me here!
 -Paul

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-15 Thread Paul Wais
Dear List,

I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
reading SequenceFiles.  In particular, I'm seeing:

Exception in thread main org.apache.hadoop.ipc.RemoteException:
Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...

When invoking JavaSparkContext#newAPIHadoopFile().  (With args
validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
BytesWritable.class, new Job().getConfiguration() -- Pretty close to
the unit test here:
https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
)


This error indicates to me that Spark is using an old hadoop client to
do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


Do I need to explicitly build spark for modern hadoop??  I previously
had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
error (server is using version 9, client is using version 4).


I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


What distro of hadoop is used at Data Bricks?  Are there distros of
Spark 1.1 and hadoop that should work together out-of-the-box?
(Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

Thanks for any help anybody can give me here!
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Release date for new pyspark

2014-07-17 Thread Paul Wais
Thanks all!  (And thanks Matei for the developer link!)  I was able to
build using maven[1] but `./sbt/sbt assembly` results in build errors.
(Not familiar enough with the build to know why; in the past sbt
worked for me and maven did not).

I was able to run the master version of pyspark, which was what I
wanted, though I discovered a bug when trying to read spark-pickled
data from HDFS.  (Looks similar to
https://spark-project.atlassian.net/browse/SPARK-1034 from my naive
point of view).  For the curious:

Code:

conf = SparkConf()
conf.set('spark.local.dir', '/nail/tmp')
conf.set('spark.executor.memory', '28g')
conf.set('spark.app.name', 'test')

sc = SparkContext(conf=conf)

sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle)
unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle)
print unpickled_rdd.takeSample(False, 3)

Traceback (most recent call last):
  File /path/to/my/home/spark-master/tast.py, line 33, in module
print unpickled_rdd.takeSample(False, 3)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
391, in takeSample
initialCount = self.count()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
703, in reduce
vals = self.mapPartitions(func).collect()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
667, in collect
bytesInJava = self._jrdd.collect().iterator()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
1600, in _jrdd
class_tag)
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 669, in __call__
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
None.org.apache.spark.api.python.PythonRDD. Trace:
py4j.Py4JException: Constructor
org.apache.spark.api.python.PythonRDD([class
org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap,
class java.util.ArrayList, class java.lang.Boolean, class
java.lang.String, class java.util.ArrayList, class
org.apache.spark.Accumulator, class
scala.reflect.ManifestFactory$$anon$2]) does not exist
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184)
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202)
at py4j.Gateway.invoke(Gateway.java:213)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:662)


[1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package

On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust
mich...@databricks.com wrote:
 You should try cleaning and then building.  We have recently hit a bug in
 the scala compiler that sometimes causes non-clean builds to fail.


 On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Yeah, we try to have a regular 3 month release cycle; see
 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the
 current window.

 Matei

 On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote:

 You should expect master to compile and run: patches aren't merged unless
 they build and pass tests on Jenkins.

 You shouldn't expect new features to be added to stable code in
 maintenance releases (e.g. 1.0.1).

 AFAIK, we're still on track with Spark 1.1.0 development, which means that
 it should be released sometime in the second half of next month (or shortly
 thereafter).


 On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,

 The version of pyspark on master has a lot of nice new features, e.g.
 SequenceFile reading, pickle i/o, etc:
 https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353

 I downloaded the recent 1.0.1 release and was surprised to see the
 distribution did not include these changes in master.  (I've tried pulling
 master [ 9c249743ea ] and compiling from source, but I get a build failure
 in TestSQLContext.scala FWIW).

 Is an updated pyspark scheduled for the next release?  (Also, am I wrong
 in expecting HEAD on master should probably compile and run?)

 Best Regards,
 -Paul Wais






Release date for new pyspark

2014-07-16 Thread Paul Wais
Dear List,

The version of pyspark on master has a lot of nice new features, e.g.
SequenceFile reading, pickle i/o, etc:
https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353

I downloaded the recent 1.0.1 release and was surprised to see the
distribution did not include these changes in master.  (I've tried pulling
master [ 9c249743ea ] and compiling from source, but I get a build failure
in TestSQLContext.scala FWIW).

Is an updated pyspark scheduled for the next release?  (Also, am I wrong in
expecting HEAD on master should probably compile and run?)

Best Regards,
-Paul Wais