Thank you for your investigation into this!
Just for completeness, I've confirmed it's a problem only in REPL, not in
compiled Spark programs.
But within REPL, a direct consequence of non-same classes after
serialization/deserialization also means that lookup() doesn't work:
scala> class C(val s:String) extends Serializable {
| override def equals(o: Any) = if (o.isInstanceOf[C])
o.asInstanceOf[C].s == s else false
| override def toString = s
| }
defined class C
scala> val r = sc.parallelize(Array((new C("a"),11),(new C("a"),12)))
r: org.apache.spark.rdd.RDD[(C, Int)] = ParallelCollectionRDD[3] at parallelize
at <console>:14
scala> r.lookup(new C("a"))
<console>:17: error: type mismatch;
found : C
required: C
r.lookup(new C("a"))
^
On Tuesday, May 13, 2014 3:05 PM, Anand Avati <[email protected]> wrote:
On Tue, May 13, 2014 at 8:26 AM, Michael Malak <[email protected]> wrote:
Reposting here on dev since I didn't see a response on user:
>
>I'm seeing different Serializable behavior in Spark Shell vs. Scala Shell. In
>the Spark Shell, equals() fails when I use the canonical equals() pattern of
>match{}, but works when I subsitute with isInstanceOf[]. I am using Spark
>0.9.0/Scala 2.10.3.
>
>Is this a bug?
>
>Spark Shell (equals uses match{})
>=================================
>
>class C(val s:String) extends Serializable {
> override def equals(o: Any) = o match {
> case that: C => that.s == s
> case _ => false
> }
>}
>
>val x = new C("a")
>val bos = new java.io.ByteArrayOutputStream()
>val out = new java.io.ObjectOutputStream(bos)
>out.writeObject(x);
>val b = bos.toByteArray();
>out.close
>bos.close
>val y = new java.io.ObjectInputStream(new
>java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
>x.equals(y)
>
>res: Boolean = false
>
>Spark Shell (equals uses isInstanceOf[])
>========================================
>
>class C(val s:String) extends Serializable {
> override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s ==
>s) else false
>}
>
>val x = new C("a")
>val bos = new java.io.ByteArrayOutputStream()
>val out = new java.io.ObjectOutputStream(bos)
>out.writeObject(x);
>val b = bos.toByteArray();
>out.close
>bos.close
>val y = new java.io.ObjectInputStream(new
>java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
>x.equals(y)
>
>res: Boolean = true
>
>Scala Shell (equals uses match{})
>=================================
>
>class C(val s:String) extends Serializable {
> override def equals(o: Any) = o match {
> case that: C => that.s == s
> case _ => false
> }
>}
>
>val x = new C("a")
>val bos = new java.io.ByteArrayOutputStream()
>val out = new java.io.ObjectOutputStream(bos)
>out.writeObject(x);
>val b = bos.toByteArray();
>out.close
>bos.close
>val y = new java.io.ObjectInputStream(new
>java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
>x.equals(y)
>
>res: Boolean = true
>
Hmm. I see that this can be reproduced without Spark in Scala 2.11, with and
without -Yrepl-class-based command line flag to the repl. Spark's REPL has the
effective behavior of 2.11's -Yrepl-class-based flag. Inspecting the byte code
generated, it appears -Yrepl-class-based results in the creation of "$outer"
field in the generated classes (including class C). The first case match in
equals() is resulting code along the lines of (simplified):
if (o isinstanceof Cstr && this.$outer == that.$outer) { // do string compare
// }
$outer is the synthetic field object to the outer object in which the object
was created (in this case, the repl environment). Now obviously, when x is
taken through the bytestream and deserialized, it would have a new $outer
created (it may have deserialized in a different jvm or machine for all we
know). So the $outer's mismatching is expected. However I'm still trying to
understand why the outers need to be the same for the case match.