[ 
https://issues.apache.org/jira/browse/SPARK-6411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng reassigned SPARK-6411:
------------------------------------

    Assignee: Xiangrui Meng  (was: Davies Liu)

> PySpark DataFrames can't be created if any datetimes have timezones
> -------------------------------------------------------------------
>
>                 Key: SPARK-6411
>                 URL: https://issues.apache.org/jira/browse/SPARK-6411
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 1.3.0
>            Reporter: Harry Brundage
>            Assignee: Xiangrui Meng
>
> I am unable to create a DataFrame with PySpark if any of the {{datetime}} 
> objects that pass through the conversion process have a {{tzinfo}} property 
> set. 
> This works fine:
> {code}
> In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 
> 10),)]).toDF().collect()
> Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
> {code}
> as expected, the tuple's schema is inferred as having one anonymous column 
> with a datetime field, and the datetime roundtrips through to the Java side 
> python deserialization and then back into python land upon {{collect}}. This 
> however:
> {code}
> In [5]: from dateutil.tz import tzutc
> In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, 
> tzinfo=tzutc()),)]).toDF().collect()
> {code}
> explodes with
> {code}
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
> 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid 
> pickle data for datetime; expected 1 or 7 args, got 2
>       at 
> net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
>       at 
> net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
>       at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
>       at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
>       at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
>       at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
>       at 
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
>       at 
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>       at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>       at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>       at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
>       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:64)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> By the looks of the error, it would appear as though the java depickler isn't 
> expecting the pickle stream to provide that extra timezone constructor 
> argument.
> Here's the disassembled pickle stream for a timezone-less datetime:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10)
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
>     0: c    GLOBAL     'datetime datetime'
>    19: p    PUT        0
>    22: (    MARK
>    23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
>    65: p        PUT        1
>    68: t        TUPLE      (MARK at 22)
>    69: p    PUT        2
>    72: R    REDUCE
>    73: p    PUT        3
>    76: .    STOP
> highest protocol among opcodes = 0
> {code}
> and then for one with a timezone:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
>     0: c    GLOBAL     'datetime datetime'
>    19: p    PUT        0
>    22: (    MARK
>    23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
>    65: p        PUT        1
>    68: c        GLOBAL     'copy_reg _reconstructor'
>    93: p        PUT        2
>    96: (        MARK
>    97: c            GLOBAL     'dateutil.tz tzutc'
>   116: p            PUT        3
>   119: c            GLOBAL     'datetime tzinfo'
>   136: p            PUT        4
>   139: g            GET        4
>   142: (            MARK
>   143: t                TUPLE      (MARK at 142)
>   144: R            REDUCE
>   145: p            PUT        5
>   148: t            TUPLE      (MARK at 96)
>   149: p        PUT        6
>   152: R        REDUCE
>   153: p        PUT        7
>   156: t        TUPLE      (MARK at 22)
>   157: p    PUT        8
>   160: R    REDUCE
>   161: p    PUT        9
>   164: .    STOP
> highest protocol among opcodes = 0
> {code}
> I would bet that the Pyrolite library is missing support for that nested 
> object as a second tuple member in the reconstruction of the datetime object. 
> Has anyone hit this before? Any more information I can provide?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to