[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19607 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153386064 --- Diff: python/pyspark/sql/session.py --- @@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec): record_type_list.append((str(col_names[i]), curr_type)) return np.dtype(record_type_list) if has_rec_fix else None -def _convert_from_pandas(self, pdf): +def _convert_from_pandas(self, pdf, schema, timezone): --- End diff -- Thanks, I agree with it but maybe I'll leave those as they are in this pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153386019 --- Diff: python/pyspark/sql/tests.py --- @@ -3683,6 +3808,47 @@ def check_records_per_batch(x): else: self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value) +def test_vectorized_udf_timestamps_respect_session_timezone(self): +from pyspark.sql.functions import pandas_udf, col +from datetime import datetime +import pandas as pd +schema = StructType([ +StructField("idx", LongType(), True), +StructField("timestamp", TimestampType(), True)]) +data = [(1, datetime(1969, 1, 1, 1, 1, 1)), +(2, datetime(2012, 2, 2, 2, 2, 2)), +(3, None), +(4, datetime(2100, 3, 3, 3, 3, 3))] +df = self.spark.createDataFrame(data, schema=schema) + +f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) +internal_value = pandas_udf( +lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType()) + +orig_tz = self.spark.conf.get("spark.sql.session.timeZone") +try: +timezone = "America/New_York" +self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") +try: +df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ +.withColumn("internal_value", internal_value(col("timestamp"))) +result_la = df_la.select(col("idx"), col("internal_value")).collect() +diff = 3 * 60 * 60 * 1000 * 1000 * 1000 --- End diff -- Yes, I'll add some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153386047 --- Diff: python/pyspark/sql/types.py --- @@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _check_dataframe_localize_timestamps(pdf): +def _old_pandas_exception_message(e): +""" Create an error message for importing old Pandas. """ -Convert timezone aware timestamps to timezone-naive in local time +msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" +return "%s\n%s" % (_exception_message(e), msg) + + +def _check_dataframe_localize_timestamps(pdf, timezone): +""" +Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone :param pdf: pandas.DataFrame -:return pandas.DataFrame where any timezone aware columns have be converted to tz-naive +:param timezone: the timezone to convert. if None then use local timezone +:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ -from pandas.api.types import is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(series.dtype): -pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) return pdf -def _check_series_convert_timestamps_internal(s): +def _check_series_convert_timestamps_internal(s, timezone): """ -Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for +Spark internal storage + :param s: a pandas.Series +:param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ -from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): -return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +tz = timezone or 'tzlocal()' +return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') else: return s +def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): +""" +Convert timestamp to timezone-naive in the specified timezone or local timezone + +:param s: a pandas.Series +:param fromTimezone: the timezone to convert from. if None then use local timezone +:param toTimezone: the timezone to convert to. if None then use local timezone +:return pandas.Series where if it is a timestamp, has been converted to tz-naive +""" +try: +import pandas as pd +from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +fromTz = fromTimezone or 'tzlocal()' --- End diff -- I'll update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153386033 --- Diff: python/pyspark/sql/types.py --- @@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _check_dataframe_localize_timestamps(pdf): +def _old_pandas_exception_message(e): +""" Create an error message for importing old Pandas. """ -Convert timezone aware timestamps to timezone-naive in local time +msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" +return "%s\n%s" % (_exception_message(e), msg) + + +def _check_dataframe_localize_timestamps(pdf, timezone): +""" +Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone :param pdf: pandas.DataFrame -:return pandas.DataFrame where any timezone aware columns have be converted to tz-naive +:param timezone: the timezone to convert. if None then use local timezone +:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ -from pandas.api.types import is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(series.dtype): -pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) return pdf -def _check_series_convert_timestamps_internal(s): +def _check_series_convert_timestamps_internal(s, timezone): """ -Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for +Spark internal storage + :param s: a pandas.Series +:param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ -from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): -return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +tz = timezone or 'tzlocal()' +return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') else: return s +def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): --- End diff -- Thanks, I'll update it. Maybe `toTimestamp` -> `to_timestamp` as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153386007 --- Diff: python/pyspark/sql/tests.py --- @@ -3192,16 +3255,49 @@ def test_filtered_frame(self): self.assertEqual(pdf.columns[0], "i") self.assertTrue(pdf.empty) -def test_createDataFrame_toggle(self): -pdf = self.create_pandas_data_frame() +def _createDataFrame_toggle(self, pdf, schema=None): self.spark.conf.set("spark.sql.execution.arrow.enabled", "false") try: -df_no_arrow = self.spark.createDataFrame(pdf) +df_no_arrow = self.spark.createDataFrame(pdf, schema=schema) finally: self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") -df_arrow = self.spark.createDataFrame(pdf) +df_arrow = self.spark.createDataFrame(pdf, schema=schema) +return df_no_arrow, df_arrow + +def test_createDataFrame_toggle(self): +pdf = self.create_pandas_data_frame() +df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, schema=self.schema) self.assertEquals(df_no_arrow.collect(), df_arrow.collect()) +def test_createDataFrame_respect_session_timezone(self): +from datetime import timedelta +pdf = self.create_pandas_data_frame() +orig_tz = self.spark.conf.get("spark.sql.session.timeZone") +try: +timezone = "America/New_York" +self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") +try: +df_no_arrow_la, df_arrow_la = self._createDataFrame_toggle(pdf, schema=self.schema) +result_la = df_no_arrow_la.collect() +result_arrow_la = df_arrow_la.collect() +self.assertEqual(result_la, result_arrow_la) +finally: + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") +df_no_arrow_ny, df_arrow_ny = self._createDataFrame_toggle(pdf, schema=self.schema) +result_ny = df_no_arrow_ny.collect() +result_arrow_ny = df_arrow_ny.collect() +self.assertEqual(result_ny, result_arrow_ny) + +self.assertNotEqual(result_ny, result_la) + +result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v --- End diff -- Yes, the 3 hours timedelta is the time difference. I'll add some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153385979 --- Diff: python/pyspark/sql/session.py --- @@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec): record_type_list.append((str(col_names[i]), curr_type)) return np.dtype(record_type_list) if has_rec_fix else None -def _convert_from_pandas(self, pdf): +def _convert_from_pandas(self, pdf, schema, timezone): """ Convert a pandas.DataFrame to list of records that can be used to make a DataFrame :return list of records """ +if timezone is not None: +from pyspark.sql.types import _check_series_convert_timestamps_tz_local +copied = False +if isinstance(schema, StructType): +for field in schema: +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if isinstance(field.dataType, TimestampType): +s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) +if not copied and s is not pdf[field.name]: +pdf = pdf.copy() +copied = True --- End diff -- Yes, it's to prevent the original one from being updated. I'll add some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153176050 --- Diff: python/pyspark/sql/session.py --- @@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec): record_type_list.append((str(col_names[i]), curr_type)) return np.dtype(record_type_list) if has_rec_fix else None -def _convert_from_pandas(self, pdf): +def _convert_from_pandas(self, pdf, schema, timezone): --- End diff -- Just an idea not blocking this PR. Probably, we have enough codes to make a separate Python file / class to put Pandas / Arrow stuff into one place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153149080 --- Diff: python/pyspark/sql/tests.py --- @@ -3192,16 +3255,49 @@ def test_filtered_frame(self): self.assertEqual(pdf.columns[0], "i") self.assertTrue(pdf.empty) -def test_createDataFrame_toggle(self): -pdf = self.create_pandas_data_frame() +def _createDataFrame_toggle(self, pdf, schema=None): self.spark.conf.set("spark.sql.execution.arrow.enabled", "false") try: -df_no_arrow = self.spark.createDataFrame(pdf) +df_no_arrow = self.spark.createDataFrame(pdf, schema=schema) finally: self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") -df_arrow = self.spark.createDataFrame(pdf) +df_arrow = self.spark.createDataFrame(pdf, schema=schema) +return df_no_arrow, df_arrow + +def test_createDataFrame_toggle(self): +pdf = self.create_pandas_data_frame() +df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, schema=self.schema) self.assertEquals(df_no_arrow.collect(), df_arrow.collect()) +def test_createDataFrame_respect_session_timezone(self): +from datetime import timedelta +pdf = self.create_pandas_data_frame() +orig_tz = self.spark.conf.get("spark.sql.session.timeZone") +try: +timezone = "America/New_York" +self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") +try: +df_no_arrow_la, df_arrow_la = self._createDataFrame_toggle(pdf, schema=self.schema) +result_la = df_no_arrow_la.collect() +result_arrow_la = df_arrow_la.collect() +self.assertEqual(result_la, result_arrow_la) +finally: + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") +df_no_arrow_ny, df_arrow_ny = self._createDataFrame_toggle(pdf, schema=self.schema) +result_ny = df_no_arrow_ny.collect() +result_arrow_ny = df_arrow_ny.collect() +self.assertEqual(result_ny, result_arrow_ny) + +self.assertNotEqual(result_ny, result_la) + +result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v --- End diff -- Small comments here would be helpful .. BTW, to be clear, this 3 hours timedelta is from America/Los_Angeles and America/New_York time difference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153142283 --- Diff: python/pyspark/sql/types.py --- @@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _check_dataframe_localize_timestamps(pdf): +def _old_pandas_exception_message(e): +""" Create an error message for importing old Pandas. """ -Convert timezone aware timestamps to timezone-naive in local time +msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" +return "%s\n%s" % (_exception_message(e), msg) + + +def _check_dataframe_localize_timestamps(pdf, timezone): +""" +Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone :param pdf: pandas.DataFrame -:return pandas.DataFrame where any timezone aware columns have be converted to tz-naive +:param timezone: the timezone to convert. if None then use local timezone +:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ -from pandas.api.types import is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(series.dtype): -pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) return pdf -def _check_series_convert_timestamps_internal(s): +def _check_series_convert_timestamps_internal(s, timezone): """ -Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for +Spark internal storage + :param s: a pandas.Series +:param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ -from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): -return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +tz = timezone or 'tzlocal()' +return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') else: return s +def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): --- End diff -- Nit: maybe `from_timezone` . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153142413 --- Diff: python/pyspark/sql/types.py --- @@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _check_dataframe_localize_timestamps(pdf): +def _old_pandas_exception_message(e): +""" Create an error message for importing old Pandas. """ -Convert timezone aware timestamps to timezone-naive in local time +msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" +return "%s\n%s" % (_exception_message(e), msg) + + +def _check_dataframe_localize_timestamps(pdf, timezone): +""" +Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone :param pdf: pandas.DataFrame -:return pandas.DataFrame where any timezone aware columns have be converted to tz-naive +:param timezone: the timezone to convert. if None then use local timezone +:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ -from pandas.api.types import is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(series.dtype): -pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) return pdf -def _check_series_convert_timestamps_internal(s): +def _check_series_convert_timestamps_internal(s, timezone): """ -Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for +Spark internal storage + :param s: a pandas.Series +:param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ -from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +try: +from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): -return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +tz = timezone or 'tzlocal()' +return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') else: return s +def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): +""" +Convert timestamp to timezone-naive in the specified timezone or local timezone + +:param s: a pandas.Series +:param fromTimezone: the timezone to convert from. if None then use local timezone +:param toTimezone: the timezone to convert to. if None then use local timezone +:return pandas.Series where if it is a timestamp, has been converted to tz-naive +""" +try: +import pandas as pd +from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype +except ImportError as e: +raise ImportError(_old_pandas_exception_message(e)) +fromTz = fromTimezone or 'tzlocal()' --- End diff -- Ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153168918 --- Diff: python/pyspark/sql/session.py --- @@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec): record_type_list.append((str(col_names[i]), curr_type)) return np.dtype(record_type_list) if has_rec_fix else None -def _convert_from_pandas(self, pdf): +def _convert_from_pandas(self, pdf, schema, timezone): """ Convert a pandas.DataFrame to list of records that can be used to make a DataFrame :return list of records """ +if timezone is not None: +from pyspark.sql.types import _check_series_convert_timestamps_tz_local +copied = False +if isinstance(schema, StructType): +for field in schema: +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if isinstance(field.dataType, TimestampType): +s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) +if not copied and s is not pdf[field.name]: +pdf = pdf.copy() +copied = True --- End diff -- Would you mind if I ask why we should copy here? Probably, some comments explaining it would be helpful. To be clear, Is it to prevent the original Pandas DataFrame being updated? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153167040 --- Diff: python/pyspark/sql/tests.py --- @@ -3683,6 +3808,47 @@ def check_records_per_batch(x): else: self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value) +def test_vectorized_udf_timestamps_respect_session_timezone(self): +from pyspark.sql.functions import pandas_udf, col +from datetime import datetime +import pandas as pd +schema = StructType([ +StructField("idx", LongType(), True), +StructField("timestamp", TimestampType(), True)]) +data = [(1, datetime(1969, 1, 1, 1, 1, 1)), +(2, datetime(2012, 2, 2, 2, 2, 2)), +(3, None), +(4, datetime(2100, 3, 3, 3, 3, 3))] +df = self.spark.createDataFrame(data, schema=schema) + +f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) +internal_value = pandas_udf( +lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType()) + +orig_tz = self.spark.conf.get("spark.sql.session.timeZone") +try: +timezone = "America/New_York" +self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") +try: +df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ +.withColumn("internal_value", internal_value(col("timestamp"))) +result_la = df_la.select(col("idx"), col("internal_value")).collect() +diff = 3 * 60 * 60 * 1000 * 1000 * 1000 --- End diff -- Here too. it took me a while to check where this 3 came from .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153107748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -997,6 +997,14 @@ object SQLConf { .intConf .createWithDefault(1) + val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE = +buildConf("spark.sql.execution.pandas.respectSessionTimeZone") + .internal() + .doc("When true, make Pandas DataFrame with timestamp type respecting session local " + +"timezone when converting to/from Pandas DataFrame.") --- End diff -- Sure, I'll update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153107765 --- Diff: python/setup.py --- @@ -201,7 +201,7 @@ def _supports_symlinks(): extras_require={ 'ml': ['numpy>=1.7'], 'mllib': ['numpy>=1.7'], -'sql': ['pandas>=0.13.0'] +'sql': ['pandas>=0.19.2'] --- End diff -- Sure, I'll add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r152646370 --- Diff: python/setup.py --- @@ -201,7 +201,7 @@ def _supports_symlinks(): extras_require={ 'ml': ['numpy>=1.7'], 'mllib': ['numpy>=1.7'], -'sql': ['pandas>=0.13.0'] +'sql': ['pandas>=0.19.2'] --- End diff -- Document this requirement and behavior changes in `Migration Guide`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r152645369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -997,6 +997,14 @@ object SQLConf { .intConf .createWithDefault(1) + val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE = +buildConf("spark.sql.execution.pandas.respectSessionTimeZone") + .internal() + .doc("When true, make Pandas DataFrame with timestamp type respecting session local " + +"timezone when converting to/from Pandas DataFrame.") --- End diff -- Emphasize the conf will be deprecated? > When true, make Pandas DataFrame with timestamp type respecting session local timezone when converting to/from Pandas DataFrame. This configuration will be deprecated in the future releases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/19607 [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone ## What changes were proposed in this pull request? When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone. For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`. The timestamp value from current `toPandas()` will be the following: ``` >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts") >>> df.show() +---+ | ts| +---+ |1970-01-01 00:00:01| +---+ >>> df.toPandas() ts 0 1970-01-01 17:00:01 ``` As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone. As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`. ## How was this patch tested? Added tests and existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-22395 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19607.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19607 commit 4735e5981ecf3a4bce50ce86f706e25830f4a801 Author: Takuya UESHINDate: 2017-10-23T06:27:22Z Add a conf to make Pandas DataFrame respect session local timezone. commit 1f85150dc5b26df21dca6bad2ef4eaec342c4400 Author: Takuya UESHIN Date: 2017-10-23T08:09:16Z Fix toPandas() behavior. commit 5c08ecf247bfe7e14afcdef8eba1c25cb3b68634 Author: Takuya UESHIN Date: 2017-10-23T09:15:47Z Modify pandas UDFs to respect session timezone. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org