[ https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738832#comment-14738832 ]
Sean Owen commented on SPARK-10536: ----------------------------------- My snap guess is that this is the problem: {{f._1.datum()}} You aren't cloning / copying the value you get directly from Hadoop's InputFormat API. It reuses the object internally. I think you're operating repeatedly on the last value it read. Possible? > filtered POJOs replaced by other instances after collect() > ---------------------------------------------------------- > > Key: SPARK-10536 > URL: https://issues.apache.org/jira/browse/SPARK-10536 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.4.1 > Reporter: Erik Schmiegelow > > I've encountered a very strange phenomenon with collect() in a simplistic > program written for debugging purposes. > The objective of the program is to filter objects which match an id and print > their contents to stderr so that we can have a look at the contents. Our > initial plan was to have the driver do that, because we run our applications > in a YARN cluster and we didn't want to have to look for the executor > instance first before looking at the log files. > We then discovered that the results after collect didn't match the ids for > which we had filtered, so we added a few debugging statements to find out > what happened. Interestingly enough, we get the correct instances when we > look at the instances with filter() or map() - on the executor. Once the > instances are sent back to the driver, the instances are swapped. More > intriguingly, we always get the same set of incorrect instances. > Here' s the code: > {code} > val rdd = sparkContext.newAPIHadoopFile( > input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], > classOf[NullWritable]).map( > f => f._1.datum() > ).filter(visitor => { > val result = visitor.eid == eid > if (result) { > println(s"Found match ${visitor.eid} for $eid with hash > ${visitor.hashCode()}") > val mapper = new ObjectMapper() > > System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor)) > } > result > }).map(f => { > val mapper = new ObjectMapper() > println(s"Map Output of visitor ${f.eid} for $eid with hash > ${f.hashCode()}") > > System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f)) > f > }).collect().foreach(f => { > val mapper = new ObjectMapper() > println(s"Collect Output of visitor ${f.eid} for $eid with hash > ${f.hashCode()}") > > System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f)) > f}) > {code} > The output we get in the Executor (filter + map) is as follows: > {code} > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550 > Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash > 1105567550 > {code} > The output on the driver is this: > {code} > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > Collect Output of visitor 4143521127100013504 for 4143922947700659073 with > hash 1405504607 > {code} > Some notes on the input: > - we use reflective Avro to serialize to HDFS > - we've got about 5 GB of data -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org