Hi everyone, I was working with Spark for a little while now and have encountered a very strange behaviour that caused me a lot of headaches:
I have written my own POJOs to encapsulate my data and this data is held in some JavaRDDs. Part of these POJOs is a member variable of a custom enum type. Whenever I do some operations on these RDDs such as subtract, groupByKey, reduce or similar things, the results are inconsistent and non-sensical. However, this happens only when the application runs in standalone cluster mode (10 nodes). When running locally on my developer machine, the code executes just fine. If you want to reproduce this behaviour, here <http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip> is the complete Maven project that you can run out of the box. I am running Spark 1.4.0 and submitting the application using /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar Consider the following code for my custom object: package de.spark.test; import java.io.Serializable; import java.util.Objects; public class MyObject implements Serializable { private MyEnum myEnum; public MyObject(MyEnum myEnum) { this.myEnum = myEnum; } public MyEnum getMyEnum() { return myEnum; } public void setMyEnum(MyEnum myEnum) { this.myEnum = myEnum; } @Override public int hashCode() { int hash = 5; hash = 41 * hash + Objects.hashCode(this.myEnum); return hash; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final MyObject other = (MyObject) obj; if (this.myEnum != other.myEnum) { return false; } return true; } @Override public String toString() { return "MyObject{" + "myEnum=" + myEnum + '}'; } } As you can see, I have overriden equals() and hashCode() (both are auto-generated). The enum is given as follows: package de.spark.test; import java.io.Serializable; public enum MyEnum implements Serializable { VALUE1, VALUE2 } The main() method is defined by: package de.spark.test; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Spark Test") .setMaster("myMaster"); JavaSparkContext jsc = new JavaSparkContext(conf); System.out.println("/////////////////////////////////////////////////// Object generation"); List<MyObject> l1 = new ArrayList<>(); for(int i = 0; i < 1000; i++) { l1.add(new MyObject(MyEnum.VALUE1)); } JavaRDD<MyObject> myObjectRDD1 = jsc.parallelize(l1); JavaRDD<MyObject> myObjectRDD2 = jsc.parallelize(l1); System.out.println("myObjectRDD1 count = " + myObjectRDD1.count()); System.out.println("myObjectRDD2 count = " + myObjectRDD2.count()); System.out.println("/////////////////////////////////////////////////// Distinct"); JavaRDD<MyObject> myObjectRDD1Distinct = myObjectRDD1.distinct(); JavaRDD<MyObject> myObjectRDD2Distinct = myObjectRDD2.distinct(); System.out.println("myObjectRDD1Distinct count = " + myObjectRDD1Distinct.count()); System.out.println("myObjectRDD2Distinct count = " + myObjectRDD2Distinct.count()); System.out.println("/////////////////////////////////////////////////// Subtract"); JavaRDD<MyObject> myObjectRDD1Minus1 = myObjectRDD1.subtract(myObjectRDD1); JavaRDD<MyObject> myObjectRDD1Minus2 = myObjectRDD1.subtract(myObjectRDD2); JavaRDD<MyObject> myObjectRDD2Minus1 = myObjectRDD2.subtract(myObjectRDD1); System.out.println("myObjectRDD1Minus1 count = " + myObjectRDD1Minus1.count()); System.out.println("myObjectRDD1Minus2 count = " + myObjectRDD1Minus2.count()); System.out.println("myObjectRDD2Minus1 count = " + myObjectRDD2Minus1.count()); System.out.println("/////////////////////////////////////////////////// End"); } } Both RDDs contain 1000 exactly equal objects, one would expect each call of distinct() to result in 1 and subtract(JavaRDD<MyObject>) to result in empty RDDs. However here is some sample output: /////////////////////////////////////////////////// Object generation myObjectRDD1 count = 1000 myObjectRDD2 count = 1000 /////////////////////////////////////////////////// Distinct myObjectRDD1Distinct count = 1 myObjectRDD2Distinct count = 2 /////////////////////////////////////////////////// Subtract myObjectRDD1Minus1 count = 500 myObjectRDD1Minus2 count = 0 myObjectRDD2Minus1 count = 0 /////////////////////////////////////////////////// End And this is a new run, directly following the previous one: /////////////////////////////////////////////////// Object generation myObjectRDD1 count = 1000 myObjectRDD2 count = 1000 /////////////////////////////////////////////////// Distinct myObjectRDD1Distinct count = 2 myObjectRDD2Distinct count = 1 /////////////////////////////////////////////////// Subtract myObjectRDD1Minus1 count = 500 myObjectRDD1Minus2 count = 500 myObjectRDD2Minus1 count = 0 /////////////////////////////////////////////////// End Some thoughts/observations: As soon as I take the enum value out of the hashCode() function of MyObject, the code works just fine, i.e. the new hashCode() function becomes @Override public int hashCode() { int hash = 5; // hash = 41 * hash + Objects.hashCode(this.myEnum); return hash; } Additionally, the code executes fine on a local machine and only behaves strangely on a cluster. These two observations make me believe that Spark uses the hashCode of each object to distribute the objects between worker nodes and somehow the enum value results in inconsistent hash codes. Can someone help me out here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Enum-values-in-custom-objects-mess-up-RDD-operations-tp24149.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org