Re: Java 8 vs Scala
We have had excellent results operating on RDDs using Java 8 with Lambdas. It’s slightly more verbose than Scala, but I haven’t found this an issue, and haven’t missed any functionality. The new DataFrame API makes the Spark platform even more language agnostic. Tristan On 15 July 2015 at 06:40, Vineel Yalamarthy vineelyalamar...@gmail.com wrote: Good question. Like you , many are in the same boat(coming from Java background). Looking forward to response from the community. Regards Vineel On Tue, Jul 14, 2015 at 2:30 PM, spark user spark_u...@yahoo.com.invalid wrote: Hi All To Start new project in Spark , which technology is good .Java8 OR Scala . I am Java developer , Can i start with Java 8 or I Need to learn Scala . which one is better technology for quick start any POC project Thanks - su -- Thanks and Regards, Venkata Vineel, Student ,School of Computing Mobile : +1-385-2109-788 -*Innovation is the ability to convert **ideas into invoices*
Re: JavaPairRDD
You could use a map() operation, but the easiest way is probably to just call values() method on the JavaPairRDDA,B to get a JavaRDDB. See this link: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Tristan On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to get *JavaPairRDDString, String *from the tuple part of *JavaPairRDDString, Tuple2String, String .* As an example: ( http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in my *JavaPairRDDString, Tuple2String, String *and I want to get *( (46551), (0,1,0,0,0,0,0,0) )* I try to split tuple._2() and create new JavaPairRDD but I can't. How can I get that ? Have a nice day yasemin -- hiç ender hiç
Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:
Hi Imran, I had tried setting a really huge kryo buffer size (GB), but it didn’t make any difference. In my data sets, objects are no more than 1KB each, and don’t form a graph, so I don’t think the buffer size should need to be larger than a few MB, except perhaps for reasons of efficiency? The exception usually occurs in “com.esotericsoftware.kryo.util.IdentityObjectIntMap” when it is resizing (or a similar operation), implying there are too many object references, though it’s hard to see how I could get to 2b references from a few million objects... T On 6 May 2015 at 00:58, Imran Rashid iras...@cloudera.com wrote: Are you setting a really large max buffer size for kryo? Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ? If not, we should open up another issue to get a better warning in these cases. On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote: Thanks Tristan for sharing this. Actually this happens when I am reading a csv file of 3.5 GB. best, /Shahab On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi Shahab, I’ve seen exceptions very similar to this (it also manifests as negative array size exception), and I believe it’s a really bug in Kryo. See this thread: http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E Manifests in all of the following situations when working with an object graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop save APIs. Tristan On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote: Can you post your code, otherwise there's not much we can do. Regards, Olivier. Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit : Hi, I am using sprak-1.2.0 and I used Kryo serialization but I get the following excepton. java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1 I do apprecciate if anyone could tell me how I can resolve this? best, /Shahab
Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:
Hi Shahab, I’ve seen exceptions very similar to this (it also manifests as negative array size exception), and I believe it’s a really bug in Kryo. See this thread: http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E Manifests in all of the following situations when working with an object graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop save APIs. Tristan On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote: Can you post your code, otherwise there's not much we can do. Regards, Olivier. Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit : Hi, I am using sprak-1.2.0 and I used Kryo serialization but I get the following excepton. java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1 I do apprecciate if anyone could tell me how I can resolve this? best, /Shahab
Re: NegativeArraySizeException when doing joins on skewed data
Hi Imran, I can confirm this still happens when calling Kryo serialisation directly, not I’m using Java. The output file is at about 440mb at the time of the crash. Kryo is version 2.21. When I get a chance I’ll see if I can make a shareable test case and try on Kryo 3.0, I doubt they’d be interested in a bug report on 2.21? Cheers Tristan On 27 February 2015 at 07:20, Imran Rashid iras...@cloudera.com wrote: Hi Tristan, at first I thought you were just hitting another instance of https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its entirely related to kryo. Would it be possible for you to try serializing your object using kryo, without involving spark at all? If you are unfamiliar w/ kryo, you could just try something like this, it would also be OK to try out the utils in spark to do it, something like: val outputStream = new FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards) val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf) val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream) kryoStreamSer.writeObject(yourBigObject).close() My guess is that this will fail. There is a little of spark's wrapping code involved here too, but I suspect the error is out of our control. From the error, it seems like whatever object you are trying to serialize has more than 2B references: Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap. resize(IdentityObjectIntMap.java:409) Though that is rather surprising -- it doesn't even seem possible to me with an object that is only 6 GB. There are a handful of other size restrictions and tuning parameters that come with kryo as well. It would be good for us to write up some docs on those limitations, as well as work with the kryo devs to see which ones can be removed. (Eg., another one that I just noticed from browsing the code is that even when writing to a stream, kryo has an internal buffer of limited size, which is periodically flushes. Perhaps we can get kryo to turn off that buffer, or we can at least get it to flush more often.) thanks, Imran On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org wrote: I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221
Re: NegativeArraySizeException when doing joins on skewed data
I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) ... 23 more On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote: I have been running into NegativeArraySizeException's when doing joins on data with very skewed key distributions in Spark 1.2.0. I found a previous post that mentioned that this exception arises when the size of the blocks spilled during the shuffle exceeds 2GB. The post recommended increasing the number of partitions. I tried increasing the number of partitions, and using the RangePartitioner instead of the HashPartitioner but still encountered the problem. Does Spark support skewed joins similar to Pig? com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: otherElements (org.apache.spark.util.collection.CompactBuffer) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at
Kryo buffer overflows
A search shows several historical threads for similar Kryo issues, but none seem to have a definitive solution. Currently using Spark 1.2.0. While collecting/broadcasting/grouping moderately sized data sets (~500MB - 1GB), I regularly see exceptions such as the one below. I’ve tried increasing the spark.kryoserializer.buffer.max.mb value to 1MB, which I’m certain far exceeds the size of the data, but it doesn’t seem to make any difference. Reducing it to 1MB also doesn’t seem to have an effect... I’ve been able to work around these exceptions in a few cases by reducing the memory consumption of some classes, but it does seem like Spark should be able to handle stuff like this. Any ideas, or suggestions for how to go about debugging this? Cheers Tristan 2015-01-28 20:39:42 INFO DAGScheduler:59 - Stopping DAGScheduler 2015-01-28 20:39:43 INFO MapOutputTrackerMasterActor:59 - MapOutputTrackerActor stopped! 2015-01-28 20:39:43 ERROR Executor:96 - Exception in task 82.0 in stage 8.0 (TID 462) com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 3 Serialization trace: a_field (org.some.class) another_field (org.some.class) otherElements (org.apache.spark.util.collection.CompactBuffer) at com.esotericsoftware.kryo.io.Output.require(Output.java:138) at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.JavaIterableWrapperSerializer.write(JavaIterableWrapperSerializer.scala:21) at com.twitter.chill.JavaIterableWrapperSerializer.write(JavaIterableWrapperSerializer.scala:13) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:165) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-01-28 20:39:44 ERROR TaskSchedulerImpl:96 - Exception in statusUpdate java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$3@58ec279b rejected from java.util.concurrent.ThreadPoolExecutor@70747cc1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 542] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at
Incorrect results when calling collect() ?
Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan
Re: Incorrect results when calling collect() ?
Suspected the same thing, but because the underlying data classes are deserialised by Avro I think they have to be mutable as you need to provide the no-args constructor with settable fields. Nothing is being cached in my code anywhere, and this can be reproduced using data directly out of the newAPIHadoopRDD() call. Debugs added to the constructors of the various classes show that the right number are being constructed, though the watches set on some of the fields aren’t always triggering, so suspect maybe the serialisation is doing something a bit too clever? Tristan On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote: It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan
Re: Incorrect results when calling collect() ?
Recording the outcome here for the record. Based on Sean’s advice I’ve confirmed that making defensive copies of records that will be collected avoids this problem - it does seem like Avro is being a bit too aggressive when deciding it’s safe to reuse an object for a new record. On 18 December 2014 at 21:50, Sean Owen so...@cloudera.com wrote: Being mutable is fine; reusing and mutating the objects is the issue. And yes the objects you get back from Hadoop are reused by Hadoop InputFormats. You should just map the objects to a clone before using them where you need them to exist all independently at once, like before a collect(). (That said... generally speaking collect() involves copying from workers to the driver, which necessarily means a copy anyway. I suspect this isn't working that way for you since you're running it all locally?) On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers tris...@blackfrog.org wrote: Suspected the same thing, but because the underlying data classes are deserialised by Avro I think they have to be mutable as you need to provide the no-args constructor with settable fields. Nothing is being cached in my code anywhere, and this can be reproduced using data directly out of the newAPIHadoopRDD() call. Debugs added to the constructors of the various classes show that the right number are being constructed, though the watches set on some of the fields aren’t always triggering, so suspect maybe the serialisation is doing something a bit too clever? Tristan On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote: It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan