Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r149005432 --- Diff: python/pyspark/sql/types.py --- @@ -1629,35 +1629,112 @@ def to_arrow_type(dt): return arrow_type -def _check_dataframe_localize_timestamps(pdf): +def from_arrow_type(at): + """ Convert pyarrow type to Spark data type. + """ + # TODO: newer pyarrow has is_boolean(at) functions that would be better to check type + import pyarrow as pa + if at == pa.bool_(): + spark_type = BooleanType() + elif at == pa.int8(): + spark_type = ByteType() + elif at == pa.int16(): + spark_type = ShortType() + elif at == pa.int32(): + spark_type = IntegerType() + elif at == pa.int64(): + spark_type = LongType() + elif at == pa.float32(): + spark_type = FloatType() + elif at == pa.float64(): + spark_type = DoubleType() + elif type(at) == pa.DecimalType: + spark_type = DecimalType(precision=at.precision, scale=at.scale) + elif at == pa.string(): + spark_type = StringType() + elif at == pa.date32(): + spark_type = DateType() + elif type(at) == pa.TimestampType: + spark_type = TimestampType() + else: + raise TypeError("Unsupported type in conversion from Arrow: " + str(at)) + return spark_type + + +def from_arrow_schema(arrow_schema): + """ Convert schema from Arrow to Spark. + """ + return StructType( + [StructField(field.name, from_arrow_type(field.type), nullable=field.nullable) + for field in arrow_schema]) + + +def _check_dataframe_localize_timestamps(pdf, schema, timezone): """ Convert timezone aware timestamps to timezone-naive in local time :param pdf: pandas.DataFrame :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive """ - from pandas.api.types import is_datetime64tz_dtype - for column, series in pdf.iteritems(): - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(series.dtype): - pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) + import pandas as pd + try: + from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + tz = timezone or 'tzlocal()' + for column, series in pdf.iteritems(): + if type(schema[str(column)].dataType) == TimestampType: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) + elif is_datetime64_dtype(series.dtype) and timezone is not None: + # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. + pdf[column] = series.apply( + lambda ts: ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT) + except ImportError: + from pandas.core.common import is_datetime64_dtype + from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() + tz = timezone or tzlocal + for column, series in pdf.iteritems(): + if type(schema[str(column)].dataType) == TimestampType: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if not is_datetime64_dtype(series.dtype): --- End diff -- Unfortunately, Pandas <0.17(?) seems to not have `is_datetime64tz_dtype`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org