Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585787 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + """ + Serializes Pandas.Series as Arrow data. + """ + + def __init__(self): + super(ArrowPandasSerializer, self).__init__() + + def dumps(self, series): + """ + Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or + a list of series accompanied by an optional pyarrow type to coerce the data to. + """ + import pyarrow as pa + # Make input conform to [(series1, type1), (series2, type2), ...] + if not isinstance(series, (list, tuple)) or \ + (len(series) == 2 and isinstance(series[1], pa.DataType)): + series = [series] + series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] + arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] + batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) + return super(ArrowPandasSerializer, self).dumps(batch) + + def loads(self, obj): + """ + Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series + followed by a dictionary containing length of the loaded batches. + """ + import pyarrow as pa + reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) + batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] --- End diff -- And .. `xrange` here too
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org