Re: Java 8 vs Scala

2015-07-14 Thread Tristan Blakers
We have had excellent results operating on RDDs using Java 8 with Lambdas.
It’s slightly more verbose than Scala, but I haven’t found this an issue,
and haven’t missed any functionality.

The new DataFrame API makes the Spark platform even more language agnostic.

Tristan

On 15 July 2015 at 06:40, Vineel Yalamarthy vineelyalamar...@gmail.com
wrote:

  Good   question. Like  you , many are in the same boat(coming from Java
 background). Looking forward to response from the community.

 Regards
 Vineel

 On Tue, Jul 14, 2015 at 2:30 PM, spark user spark_u...@yahoo.com.invalid
 wrote:

 Hi All

 To Start new project in Spark , which technology is good .Java8 OR  Scala
 .

 I am Java developer , Can i start with Java 8  or I Need to learn Scala .

 which one is better technology  for quick start any POC project

 Thanks

 - su




 --

 Thanks and Regards,
 Venkata Vineel, Student  ,School of Computing
 Mobile : +1-385-2109-788

 -*Innovation is the ability to convert **ideas into invoices*




Re: JavaPairRDD

2015-05-13 Thread Tristan Blakers
You could use a map() operation, but the easiest way is probably to just
call values() method on the JavaPairRDDA,B to get a JavaRDDB.

See this link:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

Tristan





On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I want to get  *JavaPairRDDString, String *from the tuple part of 
 *JavaPairRDDString,
 Tuple2String, String  .*

 As an example: (
 http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in
 my *JavaPairRDDString, Tuple2String, String *and I want to get
 *( (46551), (0,1,0,0,0,0,0,0) )*

 I try to split tuple._2() and create new JavaPairRDD but I can't.
 How can I get that ?

 Have a nice day
 yasemin
 --
 hiç ender hiç



Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-06 Thread Tristan Blakers
Hi Imran,

I had tried setting a really huge kryo buffer size (GB), but it didn’t make
any difference.

In my data sets, objects are no more than 1KB each, and don’t form a graph,
so I don’t think the buffer size should need to be larger than a few MB,
except perhaps for reasons of efficiency?

The exception usually occurs in
“com.esotericsoftware.kryo.util.IdentityObjectIntMap”
when it is resizing (or a similar operation), implying there are too many
object references, though it’s hard to see how I could get to 2b references
from a few million objects...

T

On 6 May 2015 at 00:58, Imran Rashid iras...@cloudera.com wrote:

 Are you setting a really large max buffer size for kryo?
 Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ?


 If not, we should open up another issue to get a better warning in these
 cases.

 On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Tristan for sharing this. Actually this happens when I am reading
 a csv file of 3.5 GB.

 best,
 /Shahab



 On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org
 wrote:

 Hi Shahab,

 I’ve seen exceptions very similar to this (it also manifests as negative
 array size exception), and I believe it’s a really bug in Kryo.

 See this thread:

 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E

 Manifests in all of the following situations when working with an object
 graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop
 save APIs.

 Tristan


 On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote:

 Can you post your code, otherwise there's not much we can do.

 Regards,

 Olivier.

 Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit :

 Hi,

 I am using sprak-1.2.0 and I used Kryo serialization but I get the
 following excepton.

 java.io.IOException: com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1

 I do apprecciate if anyone could tell me how I can resolve this?

 best,
 /Shahab







Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-05 Thread Tristan Blakers
Hi Shahab,

I’ve seen exceptions very similar to this (it also manifests as negative
array size exception), and I believe it’s a really bug in Kryo.

See this thread:
http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E

Manifests in all of the following situations when working with an object
graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop
save APIs.

Tristan


On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote:

 Can you post your code, otherwise there's not much we can do.

 Regards,

 Olivier.

 Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit :

 Hi,

 I am using sprak-1.2.0 and I used Kryo serialization but I get the
 following excepton.

 java.io.IOException: com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1

 I do apprecciate if anyone could tell me how I can resolve this?

 best,
 /Shahab




Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Tristan Blakers
Hi Imran,

I can confirm this still happens when calling Kryo serialisation directly,
not I’m using Java. The output file is at about 440mb at the time of the
crash.

Kryo is version 2.21. When I get a chance I’ll see if I can make a
shareable test case and try on Kryo 3.0, I doubt they’d be interested in a
bug report on 2.21?

Cheers
Tristan



On 27 February 2015 at 07:20, Imran Rashid iras...@cloudera.com wrote:

 Hi Tristan,

 at first I thought you were just hitting another instance of
 https://issues.apache.org/jira/browse/SPARK-1391, but I actually think
 its entirely related to kryo.  Would it be possible for you to try
 serializing your object using kryo, without involving spark at all?  If you
 are unfamiliar w/ kryo, you could just try something like this, it would
 also be OK to try out the utils in spark to do it, something like:

 val outputStream = new
 FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards)

 val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
 val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

 kryoStreamSer.writeObject(yourBigObject).close()

 My guess is that this will fail.  There is a little of spark's wrapping
 code involved here too, but I suspect the error is out of our control.
 From the error, it seems like whatever object you are trying to serialize
 has more than 2B references:
 Caused by: java.lang.NegativeArraySizeException
 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.
 resize(IdentityObjectIntMap.java:409)

 Though that is rather surprising -- it doesn't even seem possible to me
 with an object that is only 6 GB.

 There are a handful of other size restrictions and tuning parameters that
 come with kryo as well.  It would be good for us to write up some docs on
 those limitations, as well as work with the kryo devs to see which ones can
 be removed.  (Eg., another one that I just noticed from browsing the code
 is that even when writing to a stream, kryo has an internal buffer of
 limited size, which is periodically flushes.  Perhaps we can get kryo to
 turn off that buffer, or we can at least get it to flush more often.)

 thanks,
 Imran


 On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org
 wrote:

 I get the same exception simply by doing a large broadcast of about 6GB.
 Note that I’m broadcasting a small number (~3m) of fat objects. There’s
 plenty of free RAM. This and related kryo exceptions seem to crop-up
 whenever an object graph of more than a couple of GB gets passed around.

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


 Caused by: java.lang.NegativeArraySizeException

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221

Re: NegativeArraySizeException when doing joins on skewed data

2015-02-25 Thread Tristan Blakers
I get the same exception simply by doing a large broadcast of about 6GB.
Note that I’m broadcasting a small number (~3m) of fat objects. There’s
plenty of free RAM. This and related kryo exceptions seem to crop-up
whenever an object graph of more than a couple of GB gets passed around.

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


Caused by: java.lang.NegativeArraySizeException

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

at
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)

at
com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)

at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)

... 23 more



On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote:

 I have been running into NegativeArraySizeException's when doing joins on
 data with very skewed key distributions in Spark 1.2.0. I found a previous
 post that mentioned that this exception arises when the size of the blocks
 spilled during the shuffle exceeds 2GB. The post recommended increasing the
 number of partitions. I tried increasing the number of partitions, and
 using
 the RangePartitioner instead of the HashPartitioner but still encountered
 the problem.

 Does Spark support skewed joins similar to Pig?


 com.esotericsoftware.kryo.KryoException:
 java.lang.NegativeArraySizeException
 Serialization trace:
 otherElements (org.apache.spark.util.collection.CompactBuffer)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 

Kryo buffer overflows

2015-01-28 Thread Tristan Blakers
A search shows several historical threads for similar Kryo issues, but none
seem to have a definitive solution. Currently using Spark 1.2.0.

While collecting/broadcasting/grouping moderately sized data sets (~500MB -
1GB), I regularly see exceptions such as the one below.

I’ve tried increasing the spark.kryoserializer.buffer.max.mb value to
1MB, which I’m certain far exceeds the size of the data, but it doesn’t
seem to make any difference. Reducing it to 1MB also doesn’t seem to have
an effect...

I’ve been able to work around these exceptions in a few cases by reducing
the memory consumption of some classes, but it does seem like Spark should
be able to handle stuff like this.

Any ideas, or suggestions for how to go about debugging this?

Cheers
Tristan








2015-01-28 20:39:42 INFO  DAGScheduler:59 - Stopping DAGScheduler

2015-01-28 20:39:43 INFO  MapOutputTrackerMasterActor:59 -
MapOutputTrackerActor stopped!

2015-01-28 20:39:43 ERROR Executor:96 - Exception in task 82.0 in stage 8.0
(TID 462)

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 3

Serialization trace:

a_field (org.some.class)

another_field (org.some.class)

otherElements (org.apache.spark.util.collection.CompactBuffer)

at com.esotericsoftware.kryo.io.Output.require(Output.java:138)

at
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)

at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)

at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)

at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)

at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.twitter.chill.JavaIterableWrapperSerializer.write(JavaIterableWrapperSerializer.scala:21)

at
com.twitter.chill.JavaIterableWrapperSerializer.write(JavaIterableWrapperSerializer.scala:13)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:165)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

2015-01-28 20:39:44 ERROR TaskSchedulerImpl:96 - Exception in statusUpdate

java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.scheduler.TaskResultGetter$$anon$3@58ec279b rejected from
java.util.concurrent.ThreadPoolExecutor@70747cc1[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 542]

at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)

at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)

at

Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Hi,

I’m getting some seemingly invalid results when I collect an RDD. This is
happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.

See the following code snippet:

JavaRDDThing rdd= pairRDD.values();
rdd.foreach( e - System.out.println ( RDD Foreach:  + e ) );
rdd.collect().forEach( e - System.out.println ( Collected Foreach:  + e
) );

I would expect the results from the two outputters to be identical, but
instead I see:

RDD Foreach: Thing1
RDD Foreach: Thing2
RDD Foreach: Thing3
RDD Foreach: Thing4
(…snip…)
Collected Foreach: Thing1
Collected Foreach: Thing1
Collected Foreach: Thing1
Collected Foreach: Thing2

So essentially the valid entries except for one are replaced by an
equivalent number of duplicate objects. I’ve tried various map and filter
operations, but the results in the RDD always appear correct until I try to
collect() the results. I’ve also found that calling cache() on the RDD
materialises the duplication process such that the RDD Foreach displays the
duplicates too...

Any suggestions for how I can go about debugging this would be massively
appreciated.

Cheers
Tristan


Re: Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Suspected the same thing, but because the underlying data classes are
deserialised by Avro I think they have to be mutable as you need to provide
the no-args constructor with settable fields.

Nothing is being cached in my code anywhere, and this can be reproduced
using data directly out of the newAPIHadoopRDD() call. Debugs added to the
constructors of the various classes show that the right number are being
constructed, though the watches set on some of the fields aren’t always
triggering, so suspect maybe the serialisation is doing something a bit too
clever?

Tristan

On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote:

 It sounds a lot like your values are mutable classes and you are
 mutating or reusing them somewhere? It might work until you actually
 try to materialize them all and find many point to the same object.

 On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org
 wrote:
  Hi,
 
  I’m getting some seemingly invalid results when I collect an RDD. This is
  happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
 
  See the following code snippet:
 
  JavaRDDThing rdd= pairRDD.values();
  rdd.foreach( e - System.out.println ( RDD Foreach:  + e ) );
  rdd.collect().forEach( e - System.out.println ( Collected Foreach:  +
 e )
  );
 
  I would expect the results from the two outputters to be identical, but
  instead I see:
 
  RDD Foreach: Thing1
  RDD Foreach: Thing2
  RDD Foreach: Thing3
  RDD Foreach: Thing4
  (…snip…)
  Collected Foreach: Thing1
  Collected Foreach: Thing1
  Collected Foreach: Thing1
  Collected Foreach: Thing2
 
  So essentially the valid entries except for one are replaced by an
  equivalent number of duplicate objects. I’ve tried various map and filter
  operations, but the results in the RDD always appear correct until I try
 to
  collect() the results. I’ve also found that calling cache() on the RDD
  materialises the duplication process such that the RDD Foreach displays
 the
  duplicates too...
 
  Any suggestions for how I can go about debugging this would be massively
  appreciated.
 
  Cheers
  Tristan



Re: Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Recording the outcome here for the record. Based on Sean’s advice I’ve
confirmed that making defensive copies of records that will be collected
avoids this problem - it does seem like Avro is being a bit too aggressive
when deciding it’s safe to reuse an object for a new record.

On 18 December 2014 at 21:50, Sean Owen so...@cloudera.com wrote:

 Being mutable is fine; reusing and mutating the objects is the issue.
 And yes the objects you get back from Hadoop are reused by Hadoop
 InputFormats. You should just map the objects to a clone before using
 them where you need them to exist all independently at once, like
 before a collect().

 (That said... generally speaking collect() involves copying from
 workers to the driver, which necessarily means a copy anyway. I
 suspect this isn't working that way for you since you're running it
 all locally?)

 On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers tris...@blackfrog.org
 wrote:
  Suspected the same thing, but because the underlying data classes are
  deserialised by Avro I think they have to be mutable as you need to
 provide
  the no-args constructor with settable fields.
 
  Nothing is being cached in my code anywhere, and this can be reproduced
  using data directly out of the newAPIHadoopRDD() call. Debugs added to
 the
  constructors of the various classes show that the right number are being
  constructed, though the watches set on some of the fields aren’t always
  triggering, so suspect maybe the serialisation is doing something a bit
 too
  clever?
 
  Tristan
 
  On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote:
 
  It sounds a lot like your values are mutable classes and you are
  mutating or reusing them somewhere? It might work until you actually
  try to materialize them all and find many point to the same object.
 
  On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers 
 tris...@blackfrog.org
  wrote:
   Hi,
  
   I’m getting some seemingly invalid results when I collect an RDD. This
   is
   happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
  
   See the following code snippet:
  
   JavaRDDThing rdd= pairRDD.values();
   rdd.foreach( e - System.out.println ( RDD Foreach:  + e ) );
   rdd.collect().forEach( e - System.out.println ( Collected Foreach:
  +
   e )
   );
  
   I would expect the results from the two outputters to be identical,
 but
   instead I see:
  
   RDD Foreach: Thing1
   RDD Foreach: Thing2
   RDD Foreach: Thing3
   RDD Foreach: Thing4
   (…snip…)
   Collected Foreach: Thing1
   Collected Foreach: Thing1
   Collected Foreach: Thing1
   Collected Foreach: Thing2
  
   So essentially the valid entries except for one are replaced by an
   equivalent number of duplicate objects. I’ve tried various map and
   filter
   operations, but the results in the RDD always appear correct until I
 try
   to
   collect() the results. I’ve also found that calling cache() on the RDD
   materialises the duplication process such that the RDD Foreach
 displays
   the
   duplicates too...
  
   Any suggestions for how I can go about debugging this would be
 massively
   appreciated.
  
   Cheers
   Tristan