As the error says clearly, column FL Date has a different format that you
are expecting. Modify you date format mask appropriately

On Wed, 11 Apr 2018 at 5:12 pm, @Nandan@ <nandanpriyadarshi...@gmail.com>
wrote:

> Hi ,
> I am not able to use .map function in Spark.
>
> My codes are as below :-
>
> *1) Create Parse function:-*
>
> from datetime import datetime
> from collections import namedtuple
> fields =
> ('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance')
> Flight = namedtuple('Flight',fields,verbose=True)
> DATE_FMT = "%y-%m-%d"
> TIME_FMT = "%H%M"
> def parse(row) :
>     row[0] = datetime.strptime(row[0], DATE_FMT).date()
>     row[5] = datetime.strptime(row[5], TIME_FMT).time()
>     row[6] = float(row[6])
>     row[7] = datetime.strptime(row[7], TIME_FMT).time()
>     row[8] = float(row[8])
>     row[9] = float(row[9])
>     row[10] = float(row[10])
>     return Flight(*row[:11])
>
> *2) Using Parse to parse my RDD*
>
> flightsParsedMap = flights.map(lambda x: x.split(',')).map(parse)
>
> *3) Checking Parsed RDD *
> flightsParsedMap
> *Output is :-  *
>
> *PythonRDD[8] at RDD at PythonRDD.scala:48*
> *4) Checking first row :-*
>
> flightsParsedMap.first()
> Here i am getting issue:-
>
>
>
> ---------------------------------------------------------------------------Py4JJavaError
>                              Traceback (most recent call 
> last)<ipython-input-30-2f844be53361> in <module>()----> 1 
> flightsParsedMap.first()
> C:\spark\spark\python\pyspark\rdd.py in first(self)   1374         
> ValueError: RDD is empty   1375         """-> 1376         rs = self.take(1)  
>  1377         if rs:   1378             return rs[0]
> C:\spark\spark\python\pyspark\rdd.py in take(self, num)   1356    1357        
>      p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))-> 
> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)   1359   
>  1360             items += res
> C:\spark\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)    999         # SparkContext#runJob.   1000         
> mappedRDD = rdd.mapPartitions(partitionFunc)-> 1001         port = 
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)   
> 1002         return list(_load_from_socket(port, 
> mappedRDD._jrdd_deserializer))   1003
> C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in 
> __call__(self, *args)   1158         answer = 
> self.gateway_client.send_command(command)   1159         return_value = 
> get_return_value(-> 1160             answer, self.gateway_client, 
> self.target_id, self.name)   1161    1162         for temp_arg in temp_args:
> C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)     61     def 
> deco(*a, **kw):     62         try:---> 63             return f(*a, **kw)     
> 64         except py4j.protocol.Py4JJavaError as e:     65             s = 
> e.java_exception.toString()
> C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)    318              
>    raise Py4JJavaError(    319                     "An error occurred while 
> calling {0}{1}{2}.\n".--> 320                     format(target_id, ".", 
> name), value)    321             else:    322                 raise Py4JError(
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 
> (TID 9, localhost, executor driver): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, 
> in main
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, 
> in process
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 
> 372, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
>     yield next(iterator)
>   File "<ipython-input-28-35fca1a45bf3>", line 8, in parse
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in 
> _strptime_datetime
>     tt, fraction = _strptime(data_string, format)
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime
>     (data_string, format))
> ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d'
>
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>       at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:109)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
>       at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
>       at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at py4j.Gateway.invoke(Gateway.java:282)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:214)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, 
> in main
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, 
> in process
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 
> 372, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
>     yield next(iterator)
>   File "<ipython-input-28-35fca1a45bf3>", line 8, in parse
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in 
> _strptime_datetime
>     tt, fraction = _strptime(data_string, format)
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime
>     (data_string, format))
> ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d'
>
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>       at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:109)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       ... 1 more
>
>
>
> Please help me in this . Thanks. Nandan Priyadarshi
>
> --
Best Regards,
Ayan Guha

Reply via email to