How about using `withColumn` and UDF? example: + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78 <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <zemin...@gmail.com> wrote: > I've got a dataframe with a column looking like this: > > display(flight.select("duration").show()) > > +--------+ > |duration| > +--------+ > | 15h10m| > | 17h0m| > | 21h25m| > | 14h30m| > | 24h50m| > | 26h10m| > | 14h30m| > | 23h5m| > | 21h30m| > | 11h50m| > | 16h10m| > | 15h15m| > | 21h25m| > | 14h25m| > | 14h40m| > | 16h0m| > | 24h20m| > | 14h30m| > | 14h25m| > | 14h30m| > +--------+ > only showing top 20 rows > > > > I need to extract the hour as a number and store it as an additional > column within the same dataframe. What's the best way to do that? > > > I tried the following, but it failed: > > import re > def getHours(x): > return re.match('([0-9]+(?=h))', x) > temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF() > temp.select("duration").show() > > > error message: > > > ---------------------------------------------------------------------------Py4JJavaError > Traceback (most recent call > last)<ipython-input-89-1d5bec255302> in <module>() 2 def getHours(x): > 3 return re.match('([0-9]+(?=h))', x)----> 4 temp = > flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF() 5 > temp.select("duration").show() > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py > in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', > age=1)] 56 """---> 57 return > sparkSession.createDataFrame(self, schema, sampleRatio) 58 59 > RDD.toDF = toDF > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py > in createDataFrame(self, data, schema, samplingRatio, verifySchema) 518 > 519 if isinstance(data, RDD):--> 520 rdd, schema = > self._createFromRDD(data.map(prepare), schema, samplingRatio) 521 > else: 522 rdd, schema = self._createFromLocal(map(prepare, > data), schema) > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py > in _createFromRDD(self, rdd, schema, samplingRatio) 358 """ > 359 if schema is None or isinstance(schema, (list, tuple)):--> 360 > struct = self._inferSchema(rdd, samplingRatio) 361 > converter = _create_converter(struct) 362 rdd = > rdd.map(converter) > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py > in _inferSchema(self, rdd, samplingRatio) 329 :return: > :class:`pyspark.sql.types.StructType` 330 """--> 331 first > = rdd.first() 332 if not first: 333 raise > ValueError("The first row in RDD is empty, " > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py > in first(self) 1359 ValueError: RDD is empty 1360 """-> > 1361 rs = self.take(1) 1362 if rs: 1363 > return rs[0] > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py > in take(self, num) 1341 1342 p = range(partsScanned, > min(partsScanned + numPartsToTry, totalParts))-> 1343 res = > self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 > items += res > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py > in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # > SparkContext#runJob. 964 mappedRDD = > rdd.mapPartitions(partitionFunc)--> 965 port = > self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) > 966 return list(_load_from_socket(port, > mappedRDD._jrdd_deserializer)) 967 > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py > in __call__(self, *args) 1131 answer = > self.gateway_client.send_command(command) 1132 return_value = > get_return_value(-> 1133 answer, self.gateway_client, > self.target_id, self.name) 1134 1135 for temp_arg in temp_args: > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py > in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 > return f(*a, **kw) 64 except > py4j.protocol.Py4JJavaError as e: 65 s = > e.java_exception.toString() > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py > in get_return_value(answer, gateway_client, target_id, name) 317 > raise Py4JJavaError( 318 "An error occurred > while calling {0}{1}{2}.\n".--> 319 format(target_id, > ".", name), value) 320 else: 321 raise > Py4JError( > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 75.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 75.0 (TID 1035, localhost, executor driver): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 174, in main > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 169, in process > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", > line 272, in dump_stream > bytes = self.serializer.dumps(vs) > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", > line 427, in dumps > return pickle.dumps(obj, protocol) > _pickle.PicklingError: Can't pickle <class '_sre.SRE_Match'>: attribute > lookup SRE_Match on _sre failed > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) > at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.lang.Thread.run(Unknown Source) > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) > at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441) > at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > at java.lang.reflect.Method.invoke(Unknown Source) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Unknown Source) > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 174, in main > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 169, in process > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", > line 272, in dump_stream > bytes = self.serializer.dumps(vs) > File > "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", > line 427, in dumps > return pickle.dumps(obj, protocol) > _pickle.PicklingError: Can't pickle <class '_sre.SRE_Match'>: attribute > lookup SRE_Match on _sre failed > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) > at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ... 1 more > > > > > >