Don't use udf, as `minute` and `unix_timestamp` are native method of
spark.sql.


scala> df.withColumn("minute", minute(unix_timestamp($"str",
"HH'h'mm'm'").cast("timestamp"))).show





On Tue, Apr 25, 2017 at 7:55 AM, Zeming Yu <zemin...@gmail.com> wrote:

> I tried this, but doesn't seem to work. Do you know how ot fix it?
>
> def getMinutes(aString):
>     return minute(unix_timestamp(aString, "HH'h'mm'm'").cast("timestamp"))
>
> udfGetMinutes = udf(getMinutes, IntegerType())
>
> flight2 = (flight2.withColumn('stop_duration1',
> udfGetMinutes(flight2.stop_duration1))
>           )
>
>
>
> On Sat, Apr 22, 2017 at 8:51 PM, 颜发才(Yan Facai) <facai....@gmail.com>
> wrote:
>
>> Hi, Zeming.
>>
>> I prefer to convert String to DateTime, like this:
>>
>> scala> val df = Seq("15h10m", "17h0m", "21h25m").toDF("str")
>>
>> scala> val ts = unix_timestamp($"str", "HH'h'mm'm'").cast("timestamp")
>>
>> scala> df.withColumn("minute", minute(ts)).show
>> +------+------+
>> |   str|minute|
>> +------+------+
>> |15h10m|    10|
>> | 17h0m |      0|
>> |21h25m|    25|
>> +------+------+
>>
>>
>> By the way, check Date-time function section of API:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org
>> .apache.spark.sql.functions$
>>
>>
>>
>>
>> On Sat, Apr 22, 2017 at 6:27 PM, Zeming Yu <zemin...@gmail.com> wrote:
>>
>>> Thanks a lot!
>>>
>>> Just another question, how can I extract the minutes as a number?
>>>
>>> I can use:
>>> .withColumn('duration_m',split(flight.duration,'h').getItem(1)
>>>
>>> to get strings like '10m'
>>>
>>> but how do I drop the charater "m" at the end? I can use substr(), but
>>> what's the function to get the length of the string so that I can do
>>> something like substr(1, len(...)-1)?
>>>
>>> On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar <pushkarvgu...@gmail.com
>>> > wrote:
>>>
>>>> Can be as  simple as -
>>>>
>>>> from pyspark.sql.functions import split
>>>>
>>>> flight.withColumn('hour',split(flight.duration,'h').getItem(0))
>>>>
>>>>
>>>> Thank you,
>>>> *Pushkar Gujar*
>>>>
>>>>
>>>> On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu <zemin...@gmail.com> wrote:
>>>>
>>>>> Any examples?
>>>>>
>>>>> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)" <facai....@gmail.com> wrote:
>>>>>
>>>>>> How about using `withColumn` and UDF?
>>>>>>
>>>>>> example:
>>>>>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>>>>>> <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78>
>>>>>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <zemin...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've got a dataframe with a column looking like this:
>>>>>>>
>>>>>>> display(flight.select("duration").show())
>>>>>>>
>>>>>>> +--------+
>>>>>>> |duration|
>>>>>>> +--------+
>>>>>>> |  15h10m|
>>>>>>> |   17h0m|
>>>>>>> |  21h25m|
>>>>>>> |  14h30m|
>>>>>>> |  24h50m|
>>>>>>> |  26h10m|
>>>>>>> |  14h30m|
>>>>>>> |   23h5m|
>>>>>>> |  21h30m|
>>>>>>> |  11h50m|
>>>>>>> |  16h10m|
>>>>>>> |  15h15m|
>>>>>>> |  21h25m|
>>>>>>> |  14h25m|
>>>>>>> |  14h40m|
>>>>>>> |   16h0m|
>>>>>>> |  24h20m|
>>>>>>> |  14h30m|
>>>>>>> |  14h25m|
>>>>>>> |  14h30m|
>>>>>>> +--------+
>>>>>>> only showing top 20 rows
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I need to extract the hour as a number and store it as an additional
>>>>>>> column within the same dataframe. What's the best way to do that?
>>>>>>>
>>>>>>>
>>>>>>> I tried the following, but it failed:
>>>>>>>
>>>>>>> import re
>>>>>>> def getHours(x):
>>>>>>>   return re.match('([0-9]+(?=h))', x)
>>>>>>> temp = flight.select("duration").rdd.map(lambda
>>>>>>> x:getHours(x[0])).toDF()
>>>>>>> temp.select("duration").show()
>>>>>>>
>>>>>>>
>>>>>>> error message:
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------------Py4JJavaError
>>>>>>>                              Traceback (most recent call 
>>>>>>> last)<ipython-input-89-1d5bec255302> in <module>()      2 def 
>>>>>>> getHours(x):      3   return re.match('([0-9]+(?=h))', x)----> 4 temp = 
>>>>>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()      
>>>>>>> 5 temp.select("duration").show()
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>>  in toDF(self, schema, sampleRatio)     55         [Row(name=u'Alice', 
>>>>>>> age=1)]     56         """---> 57         return 
>>>>>>> sparkSession.createDataFrame(self, schema, sampleRatio)     58      59  
>>>>>>>    RDD.toDF = toDF
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)    
>>>>>>> 518     519         if isinstance(data, RDD):--> 520             rdd, 
>>>>>>> schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)  
>>>>>>>   521         else:    522             rdd, schema = 
>>>>>>> self._createFromLocal(map(prepare, data), schema)
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>>  in _createFromRDD(self, rdd, schema, samplingRatio)    358         """ 
>>>>>>>    359         if schema is None or isinstance(schema, (list, 
>>>>>>> tuple)):--> 360             struct = self._inferSchema(rdd, 
>>>>>>> samplingRatio)    361             converter = _create_converter(struct) 
>>>>>>>    362             rdd = rdd.map(converter)
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>>  in _inferSchema(self, rdd, samplingRatio)    329         :return: 
>>>>>>> :class:`pyspark.sql.types.StructType`    330         """--> 331         
>>>>>>> first = rdd.first()    332         if not first:    333             
>>>>>>> raise ValueError("The first row in RDD is empty, "
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>>>>>  in first(self)   1359         ValueError: RDD is empty   1360         
>>>>>>> """-> 1361         rs = self.take(1)   1362         if rs:   1363       
>>>>>>>       return rs[0]
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>>>>>  in take(self, num)   1341    1342             p = range(partsScanned, 
>>>>>>> min(partsScanned + numPartsToTry, totalParts))-> 1343
>>>>>>>
>>>>>>> ...
>
> [Message clipped]

Reply via email to