[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19607


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386064
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
--- End diff --

Thanks, I agree with it but maybe I'll leave those as they are in this pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386019
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3683,6 +3808,47 @@ def check_records_per_batch(x):
 else:
 
self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value)
 
+def test_vectorized_udf_timestamps_respect_session_timezone(self):
+from pyspark.sql.functions import pandas_udf, col
+from datetime import datetime
+import pandas as pd
+schema = StructType([
+StructField("idx", LongType(), True),
+StructField("timestamp", TimestampType(), True)])
+data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
+(2, datetime(2012, 2, 2, 2, 2, 2)),
+(3, None),
+(4, datetime(2100, 3, 3, 3, 3, 3))]
+df = self.spark.createDataFrame(data, schema=schema)
+
+f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
+internal_value = pandas_udf(
+lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT 
else None), LongType())
+
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_la = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
+.withColumn("internal_value", 
internal_value(col("timestamp")))
+result_la = df_la.select(col("idx"), 
col("internal_value")).collect()
+diff = 3 * 60 * 60 * 1000 * 1000 * 1000
--- End diff --

Yes, I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386047
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
+"""
+Convert timestamp to timezone-naive in the specified timezone or local 
timezone
+
+:param s: a pandas.Series
+:param fromTimezone: the timezone to convert from. if None then use 
local timezone
+:param toTimezone: the timezone to convert to. if None then use local 
timezone
+:return pandas.Series where if it is a timestamp, has been converted 
to tz-naive
+"""
+try:
+import pandas as pd
+from pandas.api.types import is_datetime64tz_dtype, 
is_datetime64_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+fromTz = fromTimezone or 'tzlocal()'
--- End diff --

I'll update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386033
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
--- End diff --

Thanks, I'll update it. Maybe `toTimestamp` -> `to_timestamp` as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386007
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3192,16 +3255,49 @@ def test_filtered_frame(self):
 self.assertEqual(pdf.columns[0], "i")
 self.assertTrue(pdf.empty)
 
-def test_createDataFrame_toggle(self):
-pdf = self.create_pandas_data_frame()
+def _createDataFrame_toggle(self, pdf, schema=None):
 self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
 try:
-df_no_arrow = self.spark.createDataFrame(pdf)
+df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
 finally:
 self.spark.conf.set("spark.sql.execution.arrow.enabled", 
"true")
-df_arrow = self.spark.createDataFrame(pdf)
+df_arrow = self.spark.createDataFrame(pdf, schema=schema)
+return df_no_arrow, df_arrow
+
+def test_createDataFrame_toggle(self):
+pdf = self.create_pandas_data_frame()
+df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, 
schema=self.schema)
 self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
 
+def test_createDataFrame_respect_session_timezone(self):
+from datetime import timedelta
+pdf = self.create_pandas_data_frame()
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_no_arrow_la, df_arrow_la = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_la = df_no_arrow_la.collect()
+result_arrow_la = df_arrow_la.collect()
+self.assertEqual(result_la, result_arrow_la)
+finally:
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+df_no_arrow_ny, df_arrow_ny = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_ny = df_no_arrow_ny.collect()
+result_arrow_ny = df_arrow_ny.collect()
+self.assertEqual(result_ny, result_arrow_ny)
+
+self.assertNotEqual(result_ny, result_la)
+
+result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k 
== '7_timestamp_t' else v
--- End diff --

Yes, the 3 hours timedelta is the time difference.
I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153385979
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
 """
  Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
  :return list of records
 """
+if timezone is not None:
+from pyspark.sql.types import 
_check_series_convert_timestamps_tz_local
+copied = False
+if isinstance(schema, StructType):
+for field in schema:
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if isinstance(field.dataType, TimestampType):
+s = 
_check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
+if not copied and s is not pdf[field.name]:
+pdf = pdf.copy()
+copied = True
--- End diff --

Yes, it's to prevent the original one from being updated.
I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153176050
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
--- End diff --

Just an idea not blocking this PR. Probably, we have enough codes to make a 
separate Python file / class to put Pandas / Arrow stuff into one place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153149080
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3192,16 +3255,49 @@ def test_filtered_frame(self):
 self.assertEqual(pdf.columns[0], "i")
 self.assertTrue(pdf.empty)
 
-def test_createDataFrame_toggle(self):
-pdf = self.create_pandas_data_frame()
+def _createDataFrame_toggle(self, pdf, schema=None):
 self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
 try:
-df_no_arrow = self.spark.createDataFrame(pdf)
+df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
 finally:
 self.spark.conf.set("spark.sql.execution.arrow.enabled", 
"true")
-df_arrow = self.spark.createDataFrame(pdf)
+df_arrow = self.spark.createDataFrame(pdf, schema=schema)
+return df_no_arrow, df_arrow
+
+def test_createDataFrame_toggle(self):
+pdf = self.create_pandas_data_frame()
+df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, 
schema=self.schema)
 self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
 
+def test_createDataFrame_respect_session_timezone(self):
+from datetime import timedelta
+pdf = self.create_pandas_data_frame()
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_no_arrow_la, df_arrow_la = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_la = df_no_arrow_la.collect()
+result_arrow_la = df_arrow_la.collect()
+self.assertEqual(result_la, result_arrow_la)
+finally:
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+df_no_arrow_ny, df_arrow_ny = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_ny = df_no_arrow_ny.collect()
+result_arrow_ny = df_arrow_ny.collect()
+self.assertEqual(result_ny, result_arrow_ny)
+
+self.assertNotEqual(result_ny, result_la)
+
+result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k 
== '7_timestamp_t' else v
--- End diff --

Small comments here would be helpful .. BTW, to be clear, this 3 hours 
timedelta is from America/Los_Angeles and America/New_York time difference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153142283
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
--- End diff --

Nit: maybe `from_timezone` .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153142413
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
+"""
+Convert timestamp to timezone-naive in the specified timezone or local 
timezone
+
+:param s: a pandas.Series
+:param fromTimezone: the timezone to convert from. if None then use 
local timezone
+:param toTimezone: the timezone to convert to. if None then use local 
timezone
+:return pandas.Series where if it is a timestamp, has been converted 
to tz-naive
+"""
+try:
+import pandas as pd
+from pandas.api.types import is_datetime64tz_dtype, 
is_datetime64_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+fromTz = fromTimezone or 'tzlocal()'
--- End diff --

Ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153168918
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
 """
  Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
  :return list of records
 """
+if timezone is not None:
+from pyspark.sql.types import 
_check_series_convert_timestamps_tz_local
+copied = False
+if isinstance(schema, StructType):
+for field in schema:
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if isinstance(field.dataType, TimestampType):
+s = 
_check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
+if not copied and s is not pdf[field.name]:
+pdf = pdf.copy()
+copied = True
--- End diff --

Would you mind if I ask why we should copy here? Probably, some comments 
explaining it would be helpful. To be clear, Is it to prevent the original 
Pandas DataFrame being updated?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153167040
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3683,6 +3808,47 @@ def check_records_per_batch(x):
 else:
 
self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value)
 
+def test_vectorized_udf_timestamps_respect_session_timezone(self):
+from pyspark.sql.functions import pandas_udf, col
+from datetime import datetime
+import pandas as pd
+schema = StructType([
+StructField("idx", LongType(), True),
+StructField("timestamp", TimestampType(), True)])
+data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
+(2, datetime(2012, 2, 2, 2, 2, 2)),
+(3, None),
+(4, datetime(2100, 3, 3, 3, 3, 3))]
+df = self.spark.createDataFrame(data, schema=schema)
+
+f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
+internal_value = pandas_udf(
+lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT 
else None), LongType())
+
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_la = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
+.withColumn("internal_value", 
internal_value(col("timestamp")))
+result_la = df_la.select(col("idx"), 
col("internal_value")).collect()
+diff = 3 * 60 * 60 * 1000 * 1000 * 1000
--- End diff --

Here too. it took me a while to check where this 3 came from ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153107748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -997,6 +997,14 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
+buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
+  .internal()
+  .doc("When true, make Pandas DataFrame with timestamp type 
respecting session local " +
+"timezone when converting to/from Pandas DataFrame.")
--- End diff --

Sure, I'll update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153107765
  
--- Diff: python/setup.py ---
@@ -201,7 +201,7 @@ def _supports_symlinks():
 extras_require={
 'ml': ['numpy>=1.7'],
 'mllib': ['numpy>=1.7'],
-'sql': ['pandas>=0.13.0']
+'sql': ['pandas>=0.19.2']
--- End diff --

Sure, I'll add it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r152646370
  
--- Diff: python/setup.py ---
@@ -201,7 +201,7 @@ def _supports_symlinks():
 extras_require={
 'ml': ['numpy>=1.7'],
 'mllib': ['numpy>=1.7'],
-'sql': ['pandas>=0.13.0']
+'sql': ['pandas>=0.19.2']
--- End diff --

Document this requirement and behavior changes in `Migration Guide`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r152645369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -997,6 +997,14 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
+buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
+  .internal()
+  .doc("When true, make Pandas DataFrame with timestamp type 
respecting session local " +
+"timezone when converting to/from Pandas DataFrame.")
--- End diff --

Emphasize the conf will be deprecated?

> When true, make Pandas DataFrame with timestamp type respecting session 
local timezone when converting to/from Pandas DataFrame. This configuration 
will be deprecated in the future releases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-10-29 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/19607

[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas 
to respect session timezone

## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using 
`toPandas()` or pandas udfs, timestamp values behave to respect Python system 
timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone 
and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in 
Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], 
"long").selectExpr("timestamp(value) as ts")
>>> df.show()
+---+
| ts|
+---+
|1970-01-01 00:00:01|
+---+

>>> df.toPandas()
   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it 
respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value 
should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-22395

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19607.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19607


commit 4735e5981ecf3a4bce50ce86f706e25830f4a801
Author: Takuya UESHIN 
Date:   2017-10-23T06:27:22Z

Add a conf to make Pandas DataFrame respect session local timezone.

commit 1f85150dc5b26df21dca6bad2ef4eaec342c4400
Author: Takuya UESHIN 
Date:   2017-10-23T08:09:16Z

Fix toPandas() behavior.

commit 5c08ecf247bfe7e14afcdef8eba1c25cb3b68634
Author: Takuya UESHIN 
Date:   2017-10-23T09:15:47Z

Modify pandas UDFs to respect session timezone.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org