[ https://issues.apache.org/jira/browse/SPARK-33150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220984#comment-17220984 ]
Aoyuan Liao commented on SPARK-33150: ------------------------------------- [~DieterDP] After I looked deeper into code, the issue is not within spark. Spark creates pandas.dataframe from pd.DataFrame.from_records. However. it ignores the fold attribute of datetime object, which leads to the same window, as: {code:java} >>> from datetime import datetime >>> test = pd.DataFrame.from_records([(datetime(2019, 10, 27, 2, 54), 1), >>> (datetime(2019, 10, 27, 2, 54, fold=1), 3)]) >>> test 0 1 0 2019-10-27 02:54:00 1 1 2019-10-27 02:54:00 3 {code} IMHO, there is nothing much spark can do. If you enable arrow in spark(config spark.sql.execution.arrow.pyspark.enabled as true), the two UTC timestamps of dataframe will be distiguished. > Groupby key may not be unique when using window > ----------------------------------------------- > > Key: SPARK-33150 > URL: https://issues.apache.org/jira/browse/SPARK-33150 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.3, 3.0.0 > Reporter: Dieter De Paepe > Priority: Major > > > Due to the way spark converts dates to local times, it may end up losing > details that allow it to differentiate instants when those times fall in the > transition for daylight savings time. Setting the spark timezone to UTC does > not resolve the issue. > This issue is somewhat related to SPARK-32123, but seems independent enough > to consider this a separate issue. > A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could > not get 2.4.x to work on my system). My machine is located in timezone > "Europe/Brussels". > > {code:java} > import pyspark > import pyspark.sql.functions as f > spark = (pyspark > .sql > .SparkSession > .builder > .master('local[1]') > .config("spark.sql.session.timeZone", "UTC") > .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \ > .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC') > .getOrCreate() > ) > debug_df = spark.createDataFrame([ > (1572137640, 1), > (1572137640, 2), > (1572141240, 3), > (1572141240, 4) > ],['epochtime', 'value']) > debug_df \ > .withColumn('time', f.from_unixtime('epochtime')) \ > .withColumn('window', f.window('time', '1 minute').start) \ > .collect() > {code} > > Output, here we see the window function internally transforms the times to > local time, and as such has to disambiguate between the Belgian winter and > summer hour transition by setting the "fold" attribute: > > {code:java} > [Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54)), > Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54)), > Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)), > Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code} > > Now, this has severe implications when we use the window function for a > groupby operation: > > {code:java} > output = debug_df \ > .withColumn('time', f.from_unixtime('epochtime')) \ > .groupby(f.window('time', '1 minute').start.alias('window')).agg( > f.min('value').alias('min_value') > ) > output_collect = output.collect() > output_pandas = output.toPandas() > print(output_collect) > print(output_pandas) > {code} > Output: > > {code:java} > [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), > Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)] > window min_value > 0 2019-10-27 00:54:00 1 > 1 2019-10-27 00:54:00 3 > {code} > > While the output using collect() outputs Belgian local time, it allows us to > differentiate between the two different keys visually using the fold > attribute. However, due to the way the fold attribute is defined, [it is > ignored for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] > equality comparison. > On the other hand, the pandas output uses the UTC output (due to the setting > of spark.sql.session.timeZone), but it has lost the disambiguating fold > attribute in the pandas datatype conversion. > In both cases, the column on which was grouped is not unique. > > {code:java} > print(output_collect[0].window == output_collect[1].window) # True > print(output_collect[0].window.fold == output_collect[1].window.fold) # False > print(output_pandas.window[0] == output_pandas.window[1]) # True > print(output_pandas.window[0].fold == output_pandas.window[1].fold) # True > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org