[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20295#discussion_r164347701 --- Diff: python/pyspark/sql/udf.py --- @@ -54,7 +54,7 @@ def _create_udf(f, returnType, evalType): "Instead, create a 1-arg pandas_udf and ignore the arg in your function." ) -if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1: +if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) not in (1, 2): raise ValueError( "Invalid function: pandas_udfs with function type GROUP_MAP " "must take a single arg that is a pandas DataFrame." --- End diff -- We should update the error message here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20295#discussion_r163904221 --- Diff: python/pyspark/serializers.py --- @@ -267,13 +267,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ -from pyspark.sql.types import _check_dataframe_localize_timestamps +from pyspark.sql.types import _check_series_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 -pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone) -yield [c for _, c in pdf.iteritems()] +yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone) + for c in pa.Table.from_batches([batch]).itercolumns()] --- End diff -- @BryanCutler Thanks for the clarification. I removed the note. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/20295#discussion_r163081467 --- Diff: python/pyspark/serializers.py --- @@ -267,13 +267,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ -from pyspark.sql.types import _check_dataframe_localize_timestamps +from pyspark.sql.types import _check_series_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 -pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone) -yield [c for _, c in pdf.iteritems()] +yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone) + for c in pa.Table.from_batches([batch]).itercolumns()] --- End diff -- Yeah, the note was because prev we iterated over Arrow columns and converted each to a Series, then changed to convert an Arrow batch to DataFrame and then iterated over DataFrame columns to get a Series. I wasn't sure if there might be a perf decrease, so I left the note but I'm not sure why it wasn't done like the above in the first place - seems like it would be just as good as the original. Anyway, yeah the note can be removed now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20295#discussion_r162981806 --- Diff: python/pyspark/serializers.py --- @@ -267,13 +267,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ -from pyspark.sql.types import _check_dataframe_localize_timestamps +from pyspark.sql.types import _check_series_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 -pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone) -yield [c for _, c in pdf.iteritems()] +yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone) + for c in pa.Table.from_batches([batch]).itercolumns()] --- End diff -- I actually don't know what the comment above means. @BryanCutler do you remember? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20295#discussion_r162912985 --- Diff: python/pyspark/serializers.py --- @@ -267,13 +267,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ -from pyspark.sql.types import _check_dataframe_localize_timestamps +from pyspark.sql.types import _check_series_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 -pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone) -yield [c for _, c in pdf.iteritems()] +yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone) + for c in pa.Table.from_batches([batch]).itercolumns()] --- End diff -- Maybe we can remove the comment above (`# NOTE: ...`) ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org