Don't use udf, as `minute` and `unix_timestamp` are native method of
spark.sql.
scala> df.withColumn("minute", minute(unix_timestamp($"str",
"HH'h'mm'm'").cast("timestamp"))).show
On Tue, Apr 25, 2017 at 7:55 AM, Zeming Yu <[email protected]> wrote:
> I tried this, but doesn't seem to work. Do you know how ot fix it?
>
> def getMinutes(aString):
> return minute(unix_timestamp(aString, "HH'h'mm'm'").cast("timestamp"))
>
> udfGetMinutes = udf(getMinutes, IntegerType())
>
> flight2 = (flight2.withColumn('stop_duration1',
> udfGetMinutes(flight2.stop_duration1))
> )
>
>
>
> On Sat, Apr 22, 2017 at 8:51 PM, 颜发才(Yan Facai) <[email protected]>
> wrote:
>
>> Hi, Zeming.
>>
>> I prefer to convert String to DateTime, like this:
>>
>> scala> val df = Seq("15h10m", "17h0m", "21h25m").toDF("str")
>>
>> scala> val ts = unix_timestamp($"str", "HH'h'mm'm'").cast("timestamp")
>>
>> scala> df.withColumn("minute", minute(ts)).show
>> +------+------+
>> | str|minute|
>> +------+------+
>> |15h10m| 10|
>> | 17h0m | 0|
>> |21h25m| 25|
>> +------+------+
>>
>>
>> By the way, check Date-time function section of API:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org
>> .apache.spark.sql.functions$
>>
>>
>>
>>
>> On Sat, Apr 22, 2017 at 6:27 PM, Zeming Yu <[email protected]> wrote:
>>
>>> Thanks a lot!
>>>
>>> Just another question, how can I extract the minutes as a number?
>>>
>>> I can use:
>>> .withColumn('duration_m',split(flight.duration,'h').getItem(1)
>>>
>>> to get strings like '10m'
>>>
>>> but how do I drop the charater "m" at the end? I can use substr(), but
>>> what's the function to get the length of the string so that I can do
>>> something like substr(1, len(...)-1)?
>>>
>>> On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar <[email protected]
>>> > wrote:
>>>
>>>> Can be as simple as -
>>>>
>>>> from pyspark.sql.functions import split
>>>>
>>>> flight.withColumn('hour',split(flight.duration,'h').getItem(0))
>>>>
>>>>
>>>> Thank you,
>>>> *Pushkar Gujar*
>>>>
>>>>
>>>> On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu <[email protected]> wrote:
>>>>
>>>>> Any examples?
>>>>>
>>>>> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)" <[email protected]> wrote:
>>>>>
>>>>>> How about using `withColumn` and UDF?
>>>>>>
>>>>>> example:
>>>>>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>>>>>> <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78>
>>>>>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I've got a dataframe with a column looking like this:
>>>>>>>
>>>>>>> display(flight.select("duration").show())
>>>>>>>
>>>>>>> +--------+
>>>>>>> |duration|
>>>>>>> +--------+
>>>>>>> | 15h10m|
>>>>>>> | 17h0m|
>>>>>>> | 21h25m|
>>>>>>> | 14h30m|
>>>>>>> | 24h50m|
>>>>>>> | 26h10m|
>>>>>>> | 14h30m|
>>>>>>> | 23h5m|
>>>>>>> | 21h30m|
>>>>>>> | 11h50m|
>>>>>>> | 16h10m|
>>>>>>> | 15h15m|
>>>>>>> | 21h25m|
>>>>>>> | 14h25m|
>>>>>>> | 14h40m|
>>>>>>> | 16h0m|
>>>>>>> | 24h20m|
>>>>>>> | 14h30m|
>>>>>>> | 14h25m|
>>>>>>> | 14h30m|
>>>>>>> +--------+
>>>>>>> only showing top 20 rows
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I need to extract the hour as a number and store it as an additional
>>>>>>> column within the same dataframe. What's the best way to do that?
>>>>>>>
>>>>>>>
>>>>>>> I tried the following, but it failed:
>>>>>>>
>>>>>>> import re
>>>>>>> def getHours(x):
>>>>>>> return re.match('([0-9]+(?=h))', x)
>>>>>>> temp = flight.select("duration").rdd.map(lambda
>>>>>>> x:getHours(x[0])).toDF()
>>>>>>> temp.select("duration").show()
>>>>>>>
>>>>>>>
>>>>>>> error message:
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------------Py4JJavaError
>>>>>>> Traceback (most recent call
>>>>>>> last)<ipython-input-89-1d5bec255302> in <module>() 2 def
>>>>>>> getHours(x): 3 return re.match('([0-9]+(?=h))', x)----> 4 temp =
>>>>>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>>>>>>> 5 temp.select("duration").show()
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>> in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice',
>>>>>>> age=1)] 56 """---> 57 return
>>>>>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58 59
>>>>>>> RDD.toDF = toDF
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>> in createDataFrame(self, data, schema, samplingRatio, verifySchema)
>>>>>>> 518 519 if isinstance(data, RDD):--> 520 rdd,
>>>>>>> schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
>>>>>>> 521 else: 522 rdd, schema =
>>>>>>> self._createFromLocal(map(prepare, data), schema)
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>> in _createFromRDD(self, rdd, schema, samplingRatio) 358 """
>>>>>>> 359 if schema is None or isinstance(schema, (list,
>>>>>>> tuple)):--> 360 struct = self._inferSchema(rdd,
>>>>>>> samplingRatio) 361 converter = _create_converter(struct)
>>>>>>> 362 rdd = rdd.map(converter)
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>>>> in _inferSchema(self, rdd, samplingRatio) 329 :return:
>>>>>>> :class:`pyspark.sql.types.StructType` 330 """--> 331
>>>>>>> first = rdd.first() 332 if not first: 333
>>>>>>> raise ValueError("The first row in RDD is empty, "
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>>>>> in first(self) 1359 ValueError: RDD is empty 1360
>>>>>>> """-> 1361 rs = self.take(1) 1362 if rs: 1363
>>>>>>> return rs[0]
>>>>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>>>>> in take(self, num) 1341 1342 p = range(partsScanned,
>>>>>>> min(partsScanned + numPartsToTry, totalParts))-> 1343
>>>>>>>
>>>>>>> ...
>
> [Message clipped]