[ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
------------------------------
    Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-02-22T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])

df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)

def some_time_udf(i):
    if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
        tmp= "Morning: " + str(i)
    elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
        tmp= "Afternoon: " + str(i)
    elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
        tmp= "Evening"
    elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
        tmp= "Night"
    elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
        tmp= "Night"
    return tmp

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
 

Below is the output of the above code:
{code:java}
+----+-----+-------------------+----------------------------+
|user|id   |ts                 |day_part                    |
+----+-----+-------------------+----------------------------+
|usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
+----+-----+-------------------+----------------------------+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
+----+-----+-------------------+----------------------------+
|user|id   |ts                 |day_part                    |
+----+-----+-------------------+----------------------------+
|usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
+----+-----+-------------------+----------------------------+{code}
 

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-02-22T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])

df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)

def some_time_udf(i):
    if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
        tmp= "Morning: " + str(i)
    elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
        tmp= str(i)
    elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
        tmp= "Evening"
    elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
        tmp= "Night"
    elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
        tmp= "Night"
    return tmp

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
 

Below is the output of the above code:
{code:java}
+----+-----+-------------------+----------------------------+
|user|id   |ts                 |day_part                    |
+----+-----+-------------------+----------------------------+
|usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
+----+-----+-------------------+----------------------------+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
+----+-----+-------------------+----------------------------+
|user|id   |ts                 |day_part                    |
+----+-----+-------------------+----------------------------+
|usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
+----+-----+-------------------+----------------------------+{code}
 


> Pyspark UDF wrongly changes timestamps to UTC
> ---------------------------------------------
>
>                 Key: SPARK-33863
>                 URL: https://issues.apache.org/jira/browse/SPARK-33863
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.1
>         Environment: MAC/Linux
> Standalone cluster / local machine
>            Reporter: Nasir Ali
>            Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
>                             ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
>                             ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
>                             ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
>                             ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
>                             ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
>                             ("usr2",156.00, "2018-02-22T11:27:18+00:00")
>                             ],
>                            ["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
>     if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
>         tmp= "Morning: " + str(i)
>     elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
>         tmp= "Afternoon: " + str(i)
>     elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
>         tmp= "Evening"
>     elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
>         tmp= "Night"
>     elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
>         tmp= "Night"
>     return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> +----+-----+-------------------+----------------------------+
> |user|id   |ts                 |day_part                    |
> +----+-----+-------------------+----------------------------+
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> +----+-----+-------------------+----------------------------+
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> +----+-----+-------------------+----------------------------+
> |user|id   |ts                 |day_part                    |
> +----+-----+-------------------+----------------------------+
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> +----+-----+-------------------+----------------------------+{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to