[ 
https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738832#comment-14738832
 ] 

Sean Owen commented on SPARK-10536:
-----------------------------------

My snap guess is that this is the problem: {{f._1.datum()}}

You aren't cloning / copying the value you get directly from Hadoop's 
InputFormat API. It reuses the object internally. I think you're operating 
repeatedly on the last value it read. Possible?

> filtered POJOs replaced by other instances after collect()
> ----------------------------------------------------------
>
>                 Key: SPARK-10536
>                 URL: https://issues.apache.org/jira/browse/SPARK-10536
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Erik Schmiegelow
>
> I've encountered a very strange phenomenon with collect() in a simplistic 
> program written for debugging purposes.
> The objective of the program is to filter objects which match an id and print 
> their contents to stderr so that we can have a look at the contents. Our 
> initial plan was to have the driver do that, because we run our applications 
> in a YARN cluster and we didn't want to have to look for the executor 
> instance first before looking at the log files.
> We then discovered that the results after collect didn't match the ids for 
> which we had filtered, so we added a few debugging statements to find out 
> what happened. Interestingly enough, we get the correct instances when we 
> look at the instances with filter() or map() - on the executor. Once the 
> instances are sent back to the driver, the instances are swapped. More 
> intriguingly, we always get the same set of incorrect instances.
> Here' s the code:
> {code}
>     val rdd = sparkContext.newAPIHadoopFile(
>       input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], 
> classOf[NullWritable]).map(
>       f => f._1.datum()
>       ).filter(visitor => {
>       val result = visitor.eid == eid
>       if (result) {
>         println(s"Found match ${visitor.eid} for $eid with hash 
> ${visitor.hashCode()}")
>         val mapper = new ObjectMapper()
>         
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
>       }
>       result
>     }).map(f => {
>       val mapper = new ObjectMapper()
>       println(s"Map Output of visitor ${f.eid} for $eid with hash 
> ${f.hashCode()}")
>       
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f
>     }).collect().foreach(f => {
>       val mapper = new ObjectMapper()
>       println(s"Collect Output of visitor ${f.eid} for $eid with hash 
> ${f.hashCode()}")
>       
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f})
> {code}
> The output we get in the Executor (filter + map) is as follows:
> {code}
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
> 1105567550
> {code}
> The output on the driver is this:
> {code}
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with 
> hash 1405504607
> {code}
> Some notes on the input: 
> - we use reflective Avro to serialize to HDFS
> - we've got about 5 GB of data



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to