Erik Schmiegelow created SPARK-10536:
----------------------------------------

             Summary: filtered POJOs replaced by other instances after collect()
                 Key: SPARK-10536
                 URL: https://issues.apache.org/jira/browse/SPARK-10536
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.4.1
            Reporter: Erik Schmiegelow


I've encountered a very strange phenomenon with collect() in a simplistic 
program written for debugging purposes.

The objective of the program is to filter objects which match an id and print 
their contents to stderr so that we can have a look at the contents. Our 
initial plan was to have the driver do that, because we run our applications in 
a YARN cluster and we didn't want to have to look for the executor instance 
first before looking at the log files.

We then discovered that the results after collect didn't match the ids for 
which we had filtered, so we added a few debugging statements to find out what 
happened. Interestingly enough, we get the correct instances when we look at 
the instances with filter() or map() - on the executor. Once the instances are 
sent back to the driver, the instances are swapped. More intriguingly, we 
always get the same set of incorrect instances.

Here' s the code:

    val rdd = sparkContext.newAPIHadoopFile(
      input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], 
classOf[NullWritable]).map(
      f => f._1.datum()
      ).filter(visitor => {
      val result = visitor.eid == eid
      if (result) {
        println(s"Found match ${visitor.eid} for $eid with hash 
${visitor.hashCode()}")
        val mapper = new ObjectMapper()
        
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
      }
      result
    }).map(f => {
      val mapper = new ObjectMapper()
      println(s"Map Output of visitor ${f.eid} for $eid with hash 
${f.hashCode()}")
      
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
      f
    }).collect().foreach(f => {
      val mapper = new ObjectMapper()
      println(s"Collect Output of visitor ${f.eid} for $eid with hash 
${f.hashCode()}")
      
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
      f})

The output we get in the Executor (filter + map) is as follows:

Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 
1105567550

The output on the driver is this:

Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 
1405504607

Some notes on the input: 
- we use reflective Avro to serialize to HDFS
- we've got about 5 GB of data



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to