[ 
https://issues.apache.org/jira/browse/SPARK-33150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220984#comment-17220984
 ] 

Aoyuan Liao commented on SPARK-33150:
-------------------------------------

[~DieterDP] After I looked deeper into code, the issue is not within spark. 
Spark creates pandas.dataframe from pd.DataFrame.from_records. However. it 
ignores the fold attribute of datetime object, which leads to the same window, 
as:
{code:java}
>>> from datetime import datetime
>>> test = pd.DataFrame.from_records([(datetime(2019, 10, 27, 2, 54), 1), 
>>> (datetime(2019, 10, 27, 2, 54, fold=1), 3)])
>>> test
                    0  1
0 2019-10-27 02:54:00  1
1 2019-10-27 02:54:00  3
{code}
IMHO, there is nothing much spark can do.

If you enable arrow in spark(config spark.sql.execution.arrow.pyspark.enabled 
as true), the two UTC timestamps of dataframe will be distiguished.

> Groupby key may not be unique when using window
> -----------------------------------------------
>
>                 Key: SPARK-33150
>                 URL: https://issues.apache.org/jira/browse/SPARK-33150
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.3, 3.0.0
>            Reporter: Dieter De Paepe
>            Priority: Major
>
>  
> Due to the way spark converts dates to local times, it may end up losing 
> details that allow it to differentiate instants when those times fall in the 
> transition for daylight savings time. Setting the spark timezone to UTC does 
> not resolve the issue.
> This issue is somewhat related to SPARK-32123, but seems independent enough 
> to consider this a separate issue.
> A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could 
> not get 2.4.x to work on my system). My machine is located in timezone 
> "Europe/Brussels".
>  
> {code:java}
> import pyspark
> import pyspark.sql.functions as f
> spark = (pyspark
>  .sql
>  .SparkSession
>  .builder
>  .master('local[1]')
>  .config("spark.sql.session.timeZone", "UTC")
>  .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
>  .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
>  .getOrCreate()
> )
> debug_df = spark.createDataFrame([
>  (1572137640, 1),
>  (1572137640, 2),
>  (1572141240, 3),
>  (1572141240, 4)
> ],['epochtime', 'value'])
> debug_df \
>  .withColumn('time', f.from_unixtime('epochtime')) \
>  .withColumn('window', f.window('time', '1 minute').start) \
>  .collect()
> {code}
>  
> Output, here we see the window function internally transforms the times to 
> local time, and as such has to disambiguate between the Belgian winter and 
> summer hour transition by setting the "fold" attribute:
>  
> {code:java}
> [Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54)),
>  Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54)),
>  Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)),
>  Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', 
> window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code}
>  
> Now, this has severe implications when we use the window function for a 
> groupby operation:
>  
> {code:java}
> output = debug_df \
>  .withColumn('time', f.from_unixtime('epochtime')) \
>  .groupby(f.window('time', '1 minute').start.alias('window')).agg(
>    f.min('value').alias('min_value')
>  )
> output_collect = output.collect()
> output_pandas = output.toPandas()
> print(output_collect)
> print(output_pandas)
> {code}
> Output:
>  
> {code:java}
> [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), 
> Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]
>   window              min_value
> 0 2019-10-27 00:54:00 1
> 1 2019-10-27 00:54:00 3
> {code}
>  
> While the output using collect() outputs Belgian local time, it allows us to 
> differentiate between the two different keys visually using the fold 
> attribute. However, due to the way the fold attribute is defined, [it is 
> ignored for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] 
> equality comparison.
> On the other hand, the pandas output uses the UTC output (due to the setting 
> of spark.sql.session.timeZone), but it has lost the disambiguating fold 
> attribute in the pandas datatype conversion.
> In both cases, the column on which was grouped is not unique.
>  
> {code:java}
> print(output_collect[0].window == output_collect[1].window)  # True
> print(output_collect[0].window.fold == output_collect[1].window.fold)  # False
> print(output_pandas.window[0] == output_pandas.window[1])  # True
> print(output_pandas.window[0].fold == output_pandas.window[1].fold)  # True
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to