Re: how to add new column using regular expression within pyspark dataframe

2017-04-24 Thread Yan Facai
Don't use udf, as `minute` and `unix_timestamp` are native method of
spark.sql.


scala> df.withColumn("minute", minute(unix_timestamp($"str",
"HH'h'mm'm'").cast("timestamp"))).show





On Tue, Apr 25, 2017 at 7:55 AM, Zeming Yu  wrote:

> I tried this, but doesn't seem to work. Do you know how ot fix it?
>
> def getMinutes(aString):
> return minute(unix_timestamp(aString, "HH'h'mm'm'").cast("timestamp"))
>
> udfGetMinutes = udf(getMinutes, IntegerType())
>
> flight2 = (flight2.withColumn('stop_duration1',
> udfGetMinutes(flight2.stop_duration1))
>   )
>
>
>
> On Sat, Apr 22, 2017 at 8:51 PM, 颜发才(Yan Facai) 
> wrote:
>
>> Hi, Zeming.
>>
>> I prefer to convert String to DateTime, like this:
>>
>> scala> val df = Seq("15h10m", "17h0m", "21h25m").toDF("str")
>>
>> scala> val ts = unix_timestamp($"str", "HH'h'mm'm'").cast("timestamp")
>>
>> scala> df.withColumn("minute", minute(ts)).show
>> +--+--+
>> |   str|minute|
>> +--+--+
>> |15h10m|10|
>> | 17h0m |  0|
>> |21h25m|25|
>> +--+--+
>>
>>
>> By the way, check Date-time function section of API:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org
>> .apache.spark.sql.functions$
>>
>>
>>
>>
>> On Sat, Apr 22, 2017 at 6:27 PM, Zeming Yu  wrote:
>>
>>> Thanks a lot!
>>>
>>> Just another question, how can I extract the minutes as a number?
>>>
>>> I can use:
>>> .withColumn('duration_m',split(flight.duration,'h').getItem(1)
>>>
>>> to get strings like '10m'
>>>
>>> but how do I drop the charater "m" at the end? I can use substr(), but
>>> what's the function to get the length of the string so that I can do
>>> something like substr(1, len(...)-1)?
>>>
>>> On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar >> > wrote:
>>>
 Can be as  simple as -

 from pyspark.sql.functions import split

 flight.withColumn('hour',split(flight.duration,'h').getItem(0))


 Thank you,
 *Pushkar Gujar*


 On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu  wrote:

> Any examples?
>
> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:
>
>> How about using `withColumn` and UDF?
>>
>> example:
>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>> 
>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>
>>
>>
>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu 
>> wrote:
>>
>>> I've got a dataframe with a column looking like this:
>>>
>>> display(flight.select("duration").show())
>>>
>>> ++
>>> |duration|
>>> ++
>>> |  15h10m|
>>> |   17h0m|
>>> |  21h25m|
>>> |  14h30m|
>>> |  24h50m|
>>> |  26h10m|
>>> |  14h30m|
>>> |   23h5m|
>>> |  21h30m|
>>> |  11h50m|
>>> |  16h10m|
>>> |  15h15m|
>>> |  21h25m|
>>> |  14h25m|
>>> |  14h40m|
>>> |   16h0m|
>>> |  24h20m|
>>> |  14h30m|
>>> |  14h25m|
>>> |  14h30m|
>>> ++
>>> only showing top 20 rows
>>>
>>>
>>>
>>> I need to extract the hour as a number and store it as an additional
>>> column within the same dataframe. What's the best way to do that?
>>>
>>>
>>> I tried the following, but it failed:
>>>
>>> import re
>>> def getHours(x):
>>>   return re.match('([0-9]+(?=h))', x)
>>> temp = flight.select("duration").rdd.map(lambda
>>> x:getHours(x[0])).toDF()
>>> temp.select("duration").show()
>>>
>>>
>>> error message:
>>>
>>>
>>> ---Py4JJavaError
>>>  Traceback (most recent call 
>>> last) in ()  2 def 
>>> getHours(x):  3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  
>>> 5 temp.select("duration").show()
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>>> age=1)] 56 """---> 57 return 
>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59  
>>>RDD.toDF = toDF
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)
>>> 518 519 if isinstance(data, RDD):--> 520 rdd, 
>>> schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)  
>>>   521 else:522 rdd, schema = 
>>> self._createFromLocal(map(prepare, data), schema)
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """ 
>>>359 

Re: how to add new column using regular expression within pyspark dataframe

2017-04-22 Thread Zeming Yu
Thanks a lot!

Just another question, how can I extract the minutes as a number?

I can use:
.withColumn('duration_m',split(flight.duration,'h').getItem(1)

to get strings like '10m'

but how do I drop the charater "m" at the end? I can use substr(), but
what's the function to get the length of the string so that I can do
something like substr(1, len(...)-1)?

On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar 
wrote:

> Can be as  simple as -
>
> from pyspark.sql.functions import split
>
> flight.withColumn('hour',split(flight.duration,'h').getItem(0))
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu  wrote:
>
>> Any examples?
>>
>> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:
>>
>>> How about using `withColumn` and UDF?
>>>
>>> example:
>>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>>> 
>>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>>
>>>
>>>
>>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:
>>>
 I've got a dataframe with a column looking like this:

 display(flight.select("duration").show())

 ++
 |duration|
 ++
 |  15h10m|
 |   17h0m|
 |  21h25m|
 |  14h30m|
 |  24h50m|
 |  26h10m|
 |  14h30m|
 |   23h5m|
 |  21h30m|
 |  11h50m|
 |  16h10m|
 |  15h15m|
 |  21h25m|
 |  14h25m|
 |  14h40m|
 |   16h0m|
 |  24h20m|
 |  14h30m|
 |  14h25m|
 |  14h30m|
 ++
 only showing top 20 rows



 I need to extract the hour as a number and store it as an additional
 column within the same dataframe. What's the best way to do that?


 I tried the following, but it failed:

 import re
 def getHours(x):
   return re.match('([0-9]+(?=h))', x)
 temp = flight.select("duration").rdd.map(lambda
 x:getHours(x[0])).toDF()
 temp.select("duration").show()


 error message:


 ---Py4JJavaError
  Traceback (most recent call 
 last) in ()  2 def getHours(x): 
  3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
 flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
 temp.select("duration").show()
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
 age=1)] 56 """---> 57 return 
 sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
 RDD.toDF = toDF
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
  in createDataFrame(self, data, schema, samplingRatio, verifySchema)
 518 519 if isinstance(data, RDD):--> 520 rdd, 
 schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
 521 else:522 rdd, schema = 
 self._createFromLocal(map(prepare, data), schema)
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
 359 if schema is None or isinstance(schema, (list, tuple)):--> 360 
 struct = self._inferSchema(rdd, samplingRatio)361  
converter = _create_converter(struct)362 rdd = 
 rdd.map(converter)
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
  in _inferSchema(self, rdd, samplingRatio)329 :return: 
 :class:`pyspark.sql.types.StructType`330 """--> 331 
 first = rdd.first()332 if not first:333 raise 
 ValueError("The first row in RDD is empty, "
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
  in first(self)   1359 ValueError: RDD is empty   1360 
 """-> 1361 rs = self.take(1)   1362 if rs:   1363  
return rs[0]
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
  in take(self, num)   13411342 p = range(partsScanned, 
 min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
 self.context.runJob(self, takeUpToNumLeft, p)   13441345 
 items += res
 C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963
  # SparkContext#runJob.964 mappedRDD = 
 rdd.mapPartitions(partitionFunc)--> 965 port = 
 self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
 966 return list(_load_from_socket(port, 
>

Re: how to add new column using regular expression within pyspark dataframe

2017-04-20 Thread Pushkar.Gujar
Can be as  simple as -

from pyspark.sql.functions import split

flight.withColumn('hour',split(flight.duration,'h').getItem(0))


Thank you,
*Pushkar Gujar*


On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu  wrote:

> Any examples?
>
> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:
>
>> How about using `withColumn` and UDF?
>>
>> example:
>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>> 
>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>
>>
>>
>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:
>>
>>> I've got a dataframe with a column looking like this:
>>>
>>> display(flight.select("duration").show())
>>>
>>> ++
>>> |duration|
>>> ++
>>> |  15h10m|
>>> |   17h0m|
>>> |  21h25m|
>>> |  14h30m|
>>> |  24h50m|
>>> |  26h10m|
>>> |  14h30m|
>>> |   23h5m|
>>> |  21h30m|
>>> |  11h50m|
>>> |  16h10m|
>>> |  15h15m|
>>> |  21h25m|
>>> |  14h25m|
>>> |  14h40m|
>>> |   16h0m|
>>> |  24h20m|
>>> |  14h30m|
>>> |  14h25m|
>>> |  14h30m|
>>> ++
>>> only showing top 20 rows
>>>
>>>
>>>
>>> I need to extract the hour as a number and store it as an additional
>>> column within the same dataframe. What's the best way to do that?
>>>
>>>
>>> I tried the following, but it failed:
>>>
>>> import re
>>> def getHours(x):
>>>   return re.match('([0-9]+(?=h))', x)
>>> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>>> temp.select("duration").show()
>>>
>>>
>>> error message:
>>>
>>>
>>> ---Py4JJavaError
>>>  Traceback (most recent call 
>>> last) in ()  2 def getHours(x):  
>>> 3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>>> temp.select("duration").show()
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>>> age=1)] 56 """---> 57 return 
>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>>> RDD.toDF = toDF
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518 
>>> 519 if isinstance(data, RDD):--> 520 rdd, schema = 
>>> self._createFromRDD(data.map(prepare), schema, samplingRatio)521
>>>  else:522 rdd, schema = self._createFromLocal(map(prepare, 
>>> data), schema)
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360  
>>>struct = self._inferSchema(rdd, samplingRatio)361
>>>  converter = _create_converter(struct)362 rdd = 
>>> rdd.map(converter)
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
>>> :class:`pyspark.sql.types.StructType`330 """--> 331 
>>> first = rdd.first()332 if not first:333 raise 
>>> ValueError("The first row in RDD is empty, "
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>  in first(self)   1359 ValueError: RDD is empty   1360 
>>> """-> 1361 rs = self.take(1)   1362 if rs:   1363   
>>>   return rs[0]
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>  in take(self, num)   13411342 p = range(partsScanned, 
>>> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
>>> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
>>> items += res
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>>>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 
>>> # SparkContext#runJob.964 mappedRDD = 
>>> rdd.mapPartitions(partitionFunc)--> 965 port = 
>>> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>>> 966 return list(_load_from_socket(port, 
>>> mappedRDD._jrdd_deserializer))967
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>>>  in __call__(self, *args)   1131 answer = 
>>> self.gateway_client.send_command(command)   1132 return_value = 
>>> get_return_value(-> 1133 answer, self.gateway_client, 
>>> self.target_id, self.name)   11341135 for temp_arg in temp_args:
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
>>>  in deco(*a, **kw) 

Re: how to add new column using regular expression within pyspark dataframe

2017-04-20 Thread Zeming Yu
Any examples?

On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:

> How about using `withColumn` and UDF?
>
> example:
> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
> 
> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>
>
>
> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:
>
>> I've got a dataframe with a column looking like this:
>>
>> display(flight.select("duration").show())
>>
>> ++
>> |duration|
>> ++
>> |  15h10m|
>> |   17h0m|
>> |  21h25m|
>> |  14h30m|
>> |  24h50m|
>> |  26h10m|
>> |  14h30m|
>> |   23h5m|
>> |  21h30m|
>> |  11h50m|
>> |  16h10m|
>> |  15h15m|
>> |  21h25m|
>> |  14h25m|
>> |  14h40m|
>> |   16h0m|
>> |  24h20m|
>> |  14h30m|
>> |  14h25m|
>> |  14h30m|
>> ++
>> only showing top 20 rows
>>
>>
>>
>> I need to extract the hour as a number and store it as an additional
>> column within the same dataframe. What's the best way to do that?
>>
>>
>> I tried the following, but it failed:
>>
>> import re
>> def getHours(x):
>>   return re.match('([0-9]+(?=h))', x)
>> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>> temp.select("duration").show()
>>
>>
>> error message:
>>
>>
>> ---Py4JJavaError
>>  Traceback (most recent call 
>> last) in ()  2 def getHours(x):   
>>3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>> temp.select("duration").show()
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>> age=1)] 56 """---> 57 return 
>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>> RDD.toDF = toDF
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518  
>>519 if isinstance(data, RDD):--> 520 rdd, schema = 
>> self._createFromRDD(data.map(prepare), schema, samplingRatio)521 
>> else:522 rdd, schema = self._createFromLocal(map(prepare, 
>> data), schema)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360   
>>   struct = self._inferSchema(rdd, samplingRatio)361 
>> converter = _create_converter(struct)362 rdd = 
>> rdd.map(converter)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
>> :class:`pyspark.sql.types.StructType`330 """--> 331 
>> first = rdd.first()332 if not first:333 raise 
>> ValueError("The first row in RDD is empty, "
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in first(self)   1359 ValueError: RDD is empty   1360 """-> 
>> 1361 rs = self.take(1)   1362 if rs:   1363 
>> return rs[0]
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in take(self, num)   13411342 p = range(partsScanned, 
>> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
>> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
>> items += res
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 
>> # SparkContext#runJob.964 mappedRDD = 
>> rdd.mapPartitions(partitionFunc)--> 965 port = 
>> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>> 966 return list(_load_from_socket(port, 
>> mappedRDD._jrdd_deserializer))967
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>>  in __call__(self, *args)   1131 answer = 
>> self.gateway_client.send_command(command)   1132 return_value = 
>> get_return_value(-> 1133 answer, self.gateway_client, 
>> self.target_id, self.name)   11341135 for temp_arg in temp_args:
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
>>  in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 
>> return f(*a, **kw) 64 except 
>> py4j.protocol.Py4JJavaError as e: 65 s = 
>> e.java_exception.toString()
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
>>  in get_return_

Re: how to add new column using regular expression within pyspark dataframe

2017-04-19 Thread Yan Facai
How about using `withColumn` and UDF?

example:
+ https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78

+ https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/



On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:

> I've got a dataframe with a column looking like this:
>
> display(flight.select("duration").show())
>
> ++
> |duration|
> ++
> |  15h10m|
> |   17h0m|
> |  21h25m|
> |  14h30m|
> |  24h50m|
> |  26h10m|
> |  14h30m|
> |   23h5m|
> |  21h30m|
> |  11h50m|
> |  16h10m|
> |  15h15m|
> |  21h25m|
> |  14h25m|
> |  14h40m|
> |   16h0m|
> |  24h20m|
> |  14h30m|
> |  14h25m|
> |  14h30m|
> ++
> only showing top 20 rows
>
>
>
> I need to extract the hour as a number and store it as an additional
> column within the same dataframe. What's the best way to do that?
>
>
> I tried the following, but it failed:
>
> import re
> def getHours(x):
>   return re.match('([0-9]+(?=h))', x)
> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
> temp.select("duration").show()
>
>
> error message:
>
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()  2 def getHours(x):
>   3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
> temp.select("duration").show()
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
> age=1)] 56 """---> 57 return 
> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
> RDD.toDF = toDF
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518   
>   519 if isinstance(data, RDD):--> 520 rdd, schema = 
> self._createFromRDD(data.map(prepare), schema, samplingRatio)521 
> else:522 rdd, schema = self._createFromLocal(map(prepare, 
> data), schema)
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360
>  struct = self._inferSchema(rdd, samplingRatio)361 
> converter = _create_converter(struct)362 rdd = 
> rdd.map(converter)
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
> :class:`pyspark.sql.types.StructType`330 """--> 331 first 
> = rdd.first()332 if not first:333 raise 
> ValueError("The first row in RDD is empty, "
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
> in first(self)   1359 ValueError: RDD is empty   1360 """-> 
> 1361 rs = self.take(1)   1362 if rs:   1363 
> return rs[0]
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
> in take(self, num)   13411342 p = range(partsScanned, 
> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
> items += res
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 # 
> SparkContext#runJob.964 mappedRDD = 
> rdd.mapPartitions(partitionFunc)--> 965 port = 
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
> 966 return list(_load_from_socket(port, 
> mappedRDD._jrdd_deserializer))967
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>  in __call__(self, *args)   1131 answer = 
> self.gateway_client.send_command(command)   1132 return_value = 
> get_return_value(-> 1133 answer, self.gateway_client, 
> self.target_id, self.name)   11341135 for temp_arg in temp_args:
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
>  in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63  
>return f(*a, **kw) 64 except 
> py4j.protocol.Py4JJavaError as e: 65 s = 
> e.java_exception.toString()
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)317  
>raise Py4JJavaError(318 "An error occurred 
> while calling {0}{1}{2}.\n".--> 319 

Re: how to add new column using regular expression within pyspark dataframe

2017-04-17 Thread Павел
On Mon, Apr 17, 2017 at 3:25 PM, Zeming Yu  wrote:
> I've got a dataframe with a column looking like this:
>
> display(flight.select("duration").show())
>
> ++
> |duration|
> ++
> |  15h10m|
> |   17h0m|
> |  21h25m|
> |  14h25m|
> |  14h30m|
> ++
> only showing top 20 rows
>
>
>
> I need to extract the hour as a number and store it as an additional column
> within the same dataframe. What's the best way to do that?

You don't actually need to either switch to rdd context or use python
regexps here, which are slow. I'd suggest to try the "split" dataframe
sql function and the "getItem" column method. Bear in mind the
boundary case when duration is less than 1 hour, i.e. it might be
either 30m or 0h30m.

--
Pavel Knoblokh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to add new column using regular expression within pyspark dataframe

2017-04-17 Thread Zeming Yu
I've got a dataframe with a column looking like this:

display(flight.select("duration").show())

++
|duration|
++
|  15h10m|
|   17h0m|
|  21h25m|
|  14h30m|
|  24h50m|
|  26h10m|
|  14h30m|
|   23h5m|
|  21h30m|
|  11h50m|
|  16h10m|
|  15h15m|
|  21h25m|
|  14h25m|
|  14h40m|
|   16h0m|
|  24h20m|
|  14h30m|
|  14h25m|
|  14h30m|
++
only showing top 20 rows



I need to extract the hour as a number and store it as an additional column
within the same dataframe. What's the best way to do that?


I tried the following, but it failed:

import re
def getHours(x):
  return re.match('([0-9]+(?=h))', x)
temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
temp.select("duration").show()


error message:


---Py4JJavaError
Traceback (most recent call
last) in ()  2 def
getHours(x):  3   return re.match('([0-9]+(?=h))', x)> 4 temp
= flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
  5 temp.select("duration").show()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice',
age=1)] 56 """---> 57 return
sparkSession.createDataFrame(self, schema, sampleRatio) 58  59
RDD.toDF = toDF
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in createDataFrame(self, data, schema, samplingRatio, verifySchema)
518 519 if isinstance(data, RDD):--> 520 rdd,
schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
   521 else:522 rdd, schema =
self._createFromLocal(map(prepare, data), schema)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _createFromRDD(self, rdd, schema, samplingRatio)358 """
   359 if schema is None or isinstance(schema, (list,
tuple)):--> 360 struct = self._inferSchema(rdd,
samplingRatio)361 converter =
_create_converter(struct)362 rdd = rdd.map(converter)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _inferSchema(self, rdd, samplingRatio)329 :return:
:class:`pyspark.sql.types.StructType`330 """--> 331
 first = rdd.first()332 if not first:333
raise ValueError("The first row in RDD is empty, "
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in first(self)   1359 ValueError: RDD is empty   1360
"""-> 1361 rs = self.take(1)   1362 if rs:   1363
   return rs[0]
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in take(self, num)   13411342 p = range(partsScanned,
min(partsScanned + numPartsToTry, totalParts))-> 1343 res
= self.context.runJob(self, takeUpToNumLeft, p)   13441345
items += res
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)963
# SparkContext#runJob.964 mappedRDD =
rdd.mapPartitions(partitionFunc)--> 965 port =
self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))967
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
in __call__(self, *args)   1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   11341135 for temp_arg in
temp_args:
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
in deco(*a, **kw) 61 def deco(*a, **kw): 62
try:---> 63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
in get_return_value(answer, gateway_client, target_id, name)317
 raise Py4JJavaError(318 "An error
occurred while calling {0}{1}{2}.\n".--> 319
format(target_id, ".", name), value)320 else:321
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 75.0 failed 1 times, most recent failure: Lost task
0.0 in stage 75.0 (TID 1035, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 174, in main
  File 
"C:\