[ https://issues.apache.org/jira/browse/SPARK-33150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223971#comment-17223971 ]
Aoyuan Liao commented on SPARK-33150: ------------------------------------- [~DieterDP] The time difference is passed via fold attribute. As you can see, self.collect() is equal to output_collect in your code. [https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py#L133] >From this step, two tz-native datetimes are treated the same: {code:java} >>> print(output_collect) [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)] >>> pdf = pd.DataFrame.from_records(output_collect, columns=output.columns) >>> pdf window min_value 0 2019-10-27 02:54:00 1 1 2019-10-27 02:54:00 3 >>> pdf.dtypes window datetime64[ns] min_value int64 dtype: object {code} If this function doesn't respect fold attribute, we have to save array of fold values and use it to convert to machine timezone. It means every datetime entry needs to interate one by one again, I am not sure how it would affect the performance. As I said, there is a workaround to enable arrow in pyspark. {code:java} >>> spark = (pyspark ... .sql ... .SparkSession ... .builder ... .master('local[1]') ... .config("spark.sql.session.timeZone", "UTC") ... .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \ ... .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC') \ ... .config('spark.sql.execution.arrow.pyspark.enabled', True) ... .getOrCreate() ... ) >>> output_collect = output.collect() >>> output_pandas = output.toPandas() >>> print(output_collect) [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)] >>> print(output_pandas) window min_value 0 2019-10-27 00:54:00 1 1 2019-10-27 01:54:00 3 {code} > Groupby key may not be unique when using window > ----------------------------------------------- > > Key: SPARK-33150 > URL: https://issues.apache.org/jira/browse/SPARK-33150 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.3, 3.0.0 > Reporter: Dieter De Paepe > Priority: Major > > > Due to the way spark converts dates to local times, it may end up losing > details that allow it to differentiate instants when those times fall in the > transition for daylight savings time. Setting the spark timezone to UTC does > not resolve the issue. > This issue is somewhat related to SPARK-32123, but seems independent enough > to consider this a separate issue. > A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could > not get 2.4.x to work on my system). My machine is located in timezone > "Europe/Brussels". > > {code:java} > import pyspark > import pyspark.sql.functions as f > spark = (pyspark > .sql > .SparkSession > .builder > .master('local[1]') > .config("spark.sql.session.timeZone", "UTC") > .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \ > .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC') > .getOrCreate() > ) > debug_df = spark.createDataFrame([ > (1572137640, 1), > (1572137640, 2), > (1572141240, 3), > (1572141240, 4) > ],['epochtime', 'value']) > debug_df \ > .withColumn('time', f.from_unixtime('epochtime')) \ > .withColumn('window', f.window('time', '1 minute').start) \ > .collect() > {code} > > Output, here we see the window function internally transforms the times to > local time, and as such has to disambiguate between the Belgian winter and > summer hour transition by setting the "fold" attribute: > > {code:java} > [Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54)), > Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54)), > Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)), > Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', > window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code} > > Now, this has severe implications when we use the window function for a > groupby operation: > > {code:java} > output = debug_df \ > .withColumn('time', f.from_unixtime('epochtime')) \ > .groupby(f.window('time', '1 minute').start.alias('window')).agg( > f.min('value').alias('min_value') > ) > output_collect = output.collect() > output_pandas = output.toPandas() > print(output_collect) > print(output_pandas) > {code} > Output: > > {code:java} > [Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), > Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)] > window min_value > 0 2019-10-27 00:54:00 1 > 1 2019-10-27 00:54:00 3 > {code} > > While the output using collect() outputs Belgian local time, it allows us to > differentiate between the two different keys visually using the fold > attribute. However, due to the way the fold attribute is defined, [it is > ignored for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] > equality comparison. > On the other hand, the pandas output uses the UTC output (due to the setting > of spark.sql.session.timeZone), but it has lost the disambiguating fold > attribute in the pandas datatype conversion. > In both cases, the column on which was grouped is not unique. > > {code:java} > print(output_collect[0].window == output_collect[1].window) # True > print(output_collect[0].window.fold == output_collect[1].window.fold) # False > print(output_pandas.window[0] == output_pandas.window[1]) # True > print(output_pandas.window[0].fold == output_pandas.window[1].fold) # True > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org