Josh Rosen created SPARK-2951: --------------------------------- Summary: SerDeUtils.pythonToPairRDD fails on RDDs of pickled array.arrays in Python 2.6 Key: SPARK-2951 URL: https://issues.apache.org/jira/browse/SPARK-2951 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.1.0 Reporter: Josh Rosen
With Python 2.6, calling SerDeUtils.pythonToPairRDD() on an RDD of pickled Python array.arrays will fail with this exception: {code} ava.lang.ClassCastException: java.lang.String cannot be cast to java.util.ArrayList net.razorvine.pickle.objects.ArrayConstructor.construct(ArrayConstructor.java:33) net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) net.razorvine.pickle.Unpickler.load(Unpickler.java:84) net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$5.apply(SerDeUtil.scala:106) org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$5.apply(SerDeUtil.scala:106) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:898) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:880) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) {code} I think this is due to a difference in how array.array is pickled in Python 2.6 vs. Python 2.7. To see this, run the following script: {code} from pickletools import dis, optimize from pickle import dumps, loads, HIGHEST_PROTOCOL from array import array arr = array('d', [1, 2, 3]) #protocol = HIGHEST_PROTOCOL protocol = 0 pickled = dumps(arr, protocol=protocol) pickled = optimize(pickled) unpickled = loads(pickled) print arr print unpickled print dis(pickled) {code} In Python 2.7, this outputs {code} array('d', [1.0, 2.0, 3.0]) array('d', [1.0, 2.0, 3.0]) 0: c GLOBAL 'array array' 13: ( MARK 14: S STRING 'd' 19: ( MARK 20: l LIST (MARK at 19) 21: F FLOAT 1.0 26: a APPEND 27: F FLOAT 2.0 32: a APPEND 33: F FLOAT 3.0 38: a APPEND 39: t TUPLE (MARK at 13) 40: R REDUCE 41: . STOP highest protocol among opcodes = 0 None {code} whereas 2.6 outputs {code} array('d', [1.0, 2.0, 3.0]) array('d', [1.0, 2.0, 3.0]) 0: c GLOBAL 'array array' 13: ( MARK 14: S STRING 'd' 19: S STRING '\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x08@' 110: t TUPLE (MARK at 13) 111: R REDUCE 112: . STOP highest protocol among opcodes = 0 None {code} I think the Java-side depickling library doesn't expect this pickled format, causing this failure. I noticed this when running PySpark's unit tests on 2.6 because the TestOuputFormat.test_newhadoop test failed. I think that this issue affects all of the methods that might need to depickle arrays in Java, including all of the Hadoop output format methods. How should we try to fix this? Require that users upgrade to 2.7 if they want to use code that requires this? Open a bug with the depickling library maintainers? Try to hack in our own pickling routines for arrays if we detect that we're using 2.6? -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org