[ https://issues.apache.org/jira/browse/SPARK-6411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiangrui Meng reassigned SPARK-6411: ------------------------------------ Assignee: Xiangrui Meng (was: Davies Liu) > PySpark DataFrames can't be created if any datetimes have timezones > ------------------------------------------------------------------- > > Key: SPARK-6411 > URL: https://issues.apache.org/jira/browse/SPARK-6411 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 1.3.0 > Reporter: Harry Brundage > Assignee: Xiangrui Meng > > I am unable to create a DataFrame with PySpark if any of the {{datetime}} > objects that pass through the conversion process have a {{tzinfo}} property > set. > This works fine: > {code} > In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, > 10),)]).toDF().collect() > Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))] > {code} > as expected, the tuple's schema is inferred as having one anonymous column > with a datetime field, and the datetime roundtrips through to the Java side > python deserialization and then back into python land upon {{collect}}. This > however: > {code} > In [5]: from dateutil.tz import tzutc > In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, > tzinfo=tzutc()),)]).toDF().collect() > {code} > explodes with > {code} > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid > pickle data for datetime; expected 1 or 7 args, got 2 > at > net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69) > at > net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32) > at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) > at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) > at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) > at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) > at > org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154) > at > org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > By the looks of the error, it would appear as though the java depickler isn't > expecting the pickle stream to provide that extra timezone constructor > argument. > Here's the disassembled pickle stream for a timezone-less datetime: > {code} > >>> object = datetime.datetime(2014, 7, 8, 11, 10) > >>> stream = pickle.dumps(object) > >>> pickletools.dis(stream) > 0: c GLOBAL 'datetime datetime' > 19: p PUT 0 > 22: ( MARK > 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00' > 65: p PUT 1 > 68: t TUPLE (MARK at 22) > 69: p PUT 2 > 72: R REDUCE > 73: p PUT 3 > 76: . STOP > highest protocol among opcodes = 0 > {code} > and then for one with a timezone: > {code} > >>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()) > >>> stream = pickle.dumps(object) > >>> pickletools.dis(stream) > 0: c GLOBAL 'datetime datetime' > 19: p PUT 0 > 22: ( MARK > 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00' > 65: p PUT 1 > 68: c GLOBAL 'copy_reg _reconstructor' > 93: p PUT 2 > 96: ( MARK > 97: c GLOBAL 'dateutil.tz tzutc' > 116: p PUT 3 > 119: c GLOBAL 'datetime tzinfo' > 136: p PUT 4 > 139: g GET 4 > 142: ( MARK > 143: t TUPLE (MARK at 142) > 144: R REDUCE > 145: p PUT 5 > 148: t TUPLE (MARK at 96) > 149: p PUT 6 > 152: R REDUCE > 153: p PUT 7 > 156: t TUPLE (MARK at 22) > 157: p PUT 8 > 160: R REDUCE > 161: p PUT 9 > 164: . STOP > highest protocol among opcodes = 0 > {code} > I would bet that the Pyrolite library is missing support for that nested > object as a second tuple member in the reconstruction of the datetime object. > Has anyone hit this before? Any more information I can provide? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org