[ 
https://issues.apache.org/jira/browse/SPARK-33150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223971#comment-17223971
 ] 

Aoyuan Liao commented on SPARK-33150:
-------------------------------------

[~DieterDP] The time difference is passed via fold attribute. As you can see, 
self.collect() is equal to output_collect in your code.

[https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py#L133]

>From this step, two tz-native datetimes are treated the same:
{code:java}
>>> print(output_collect)
[Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), 
Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]
>>> pdf = pd.DataFrame.from_records(output_collect, columns=output.columns)
>>> pdf
               window  min_value
0 2019-10-27 02:54:00          1
1 2019-10-27 02:54:00          3
>>> pdf.dtypes
window       datetime64[ns]
min_value             int64
dtype: object
{code}
If this function doesn't respect fold attribute,  we have to save array of fold 
values and use it to convert to machine timezone. It means every datetime entry 
needs to interate one by one again, I am not sure how it would affect the 
performance.  

As I said, there is a workaround to enable arrow in pyspark.
{code:java}
>>> spark = (pyspark
...  .sql
...  .SparkSession
...  .builder
...  .master('local[1]')
...  .config("spark.sql.session.timeZone", "UTC")
...  .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
...  .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC') \
...  .config('spark.sql.execution.arrow.pyspark.enabled', True)
...  .getOrCreate()
... )
>>> output_collect = output.collect()
>>> output_pandas = output.toPandas()
>>> print(output_collect)
[Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), 
Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]
>>> print(output_pandas)
               window  min_value
0 2019-10-27 00:54:00          1
1 2019-10-27 01:54:00          3

{code}

> Groupby key may not be unique when using window
> -----------------------------------------------
>
>                 Key: SPARK-33150
>                 URL: https://issues.apache.org/jira/browse/SPARK-33150
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.3, 3.0.0
>            Reporter: Dieter De Paepe
>            Priority: Major
>
>  
> Due to the way spark converts dates to local times, it may end up losing 
> details that allow it to differentiate instants when those times fall in the 
> transition for daylight savings time. Setting the spark timezone to UTC does 
> not resolve the issue.
> This issue is somewhat related to SPARK-32123, but seems independent enough 
> to consider this a separate issue.
> A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could 
> not get 2.4.x to work on my system). My machine is located in timezone 
> "Europe/Brussels".
>  
> {code:java}
> import pyspark
> import pyspark.sql.functions as f
> spark = (pyspark
>  .sql
>  .SparkSession
>  .builder
>  .master('local[1]')
>  .config("spark.sql.session.timeZone", "UTC")
>  .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
>  .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
>  .getOrCreate()
> )
> debug_df = spark.createDataFrame([
>  (1572137640, 1),
>  (1572137640, 2),
>  (1572141240, 3),
>  (1572141240, 4)
> ],['epochtime', 'value'])
> debug_df \
>  .withColumn('time', f.from_unixtime('epochtime')) \
>  .withColumn('window', f.window('time', '1 minute').start) \
>  .collect()
> {code}
>  
> Output, here we see the window function internally transforms the times to 
> local time, and as such has to disambiguate between the Belgian winter and 
> summer hour transition by setting the "fold" attribute:
>  
> {code:java}
> [Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54)),
>  Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54)),
>  Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)),
>  Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code}
>  
> Now, this has severe implications when we use the window function for a 
> groupby operation:
>  
> {code:java}
> output = debug_df \
>  .withColumn('time', f.from_unixtime('epochtime')) \
>  .groupby(f.window('time', '1 minute').start.alias('window')).agg(
>    f.min('value').alias('min_value')
>  )
> output_collect = output.collect()
> output_pandas = output.toPandas()
> print(output_collect)
> print(output_pandas)
> {code}
> Output:
>  
> {code:java}
> [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), 
> Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]
>   window              min_value
> 0 2019-10-27 00:54:00 1
> 1 2019-10-27 00:54:00 3
> {code}
>  
> While the output using collect() outputs Belgian local time, it allows us to 
> differentiate between the two different keys visually using the fold 
> attribute. However, due to the way the fold attribute is defined, [it is 
> ignored for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] 
> equality comparison.
> On the other hand, the pandas output uses the UTC output (due to the setting 
> of spark.sql.session.timeZone), but it has lost the disambiguating fold 
> attribute in the pandas datatype conversion.
> In both cases, the column on which was grouped is not unique.
>  
> {code:java}
> print(output_collect[0].window == output_collect[1].window)  # True
> print(output_collect[0].window.fold == output_collect[1].window.fold)  # False
> print(output_pandas.window[0] == output_pandas.window[1])  # True
> print(output_pandas.window[0].fold == output_pandas.window[1].fold)  # True
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to