HyukjinKwon commented on a change in pull request #27109: 
[SPARK-30434][PYTHON][SQL] Move pandas related functionalities into 'pandas' 
sub-package
URL: https://github.com/apache/spark/pull/27109#discussion_r364036123
 
 

 ##########
 File path: python/pyspark/sql/dataframe.py
 ##########
 @@ -2135,193 +2135,6 @@ def transform(self, func):
                                               "should have been DataFrame." % 
type(result)
         return result
 
-    @since(1.3)
-    def toPandas(self):
-        """
-        Returns the contents of this :class:`DataFrame` as Pandas 
``pandas.DataFrame``.
-
-        This is only available if Pandas is installed and available.
-
-        .. note:: This method should only be used if the resulting Pandas's 
:class:`DataFrame` is
-            expected to be small, as all the data is loaded into the driver's 
memory.
-
-        .. note:: Usage with spark.sql.execution.arrow.pyspark.enabled=True is 
experimental.
-
-        >>> df.toPandas()  # doctest: +SKIP
-           age   name
-        0    2  Alice
-        1    5    Bob
-        """
-        from pyspark.sql.utils import require_minimum_pandas_version
-        require_minimum_pandas_version()
-
-        import numpy as np
-        import pandas as pd
-
-        if self.sql_ctx._conf.pandasRespectSessionTimeZone():
-            timezone = self.sql_ctx._conf.sessionLocalTimeZone()
-        else:
-            timezone = None
-
-        if self.sql_ctx._conf.arrowPySparkEnabled():
-            use_arrow = True
-            try:
-                from pyspark.sql.types import to_arrow_schema
-                from pyspark.sql.utils import require_minimum_pyarrow_version
-
-                require_minimum_pyarrow_version()
-                to_arrow_schema(self.schema)
-            except Exception as e:
-
-                if self.sql_ctx._conf.arrowPySparkFallbackEnabled():
-                    msg = (
-                        "toPandas attempted Arrow optimization because "
-                        "'spark.sql.execution.arrow.pyspark.enabled' is set to 
true; however, "
-                        "failed by the reason below:\n  %s\n"
-                        "Attempting non-optimization as "
-                        "'spark.sql.execution.arrow.pyspark.fallback.enabled' 
is set to "
-                        "true." % _exception_message(e))
-                    warnings.warn(msg)
-                    use_arrow = False
-                else:
-                    msg = (
-                        "toPandas attempted Arrow optimization because "
-                        "'spark.sql.execution.arrow.pyspark.enabled' is set to 
true, but has "
-                        "reached the error below and will not continue because 
automatic fallback "
-                        "with 
'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to "
-                        "false.\n  %s" % _exception_message(e))
-                    warnings.warn(msg)
-                    raise
-
-            # Try to use Arrow optimization when the schema is supported and 
the required version
-            # of PyArrow is found, if 
'spark.sql.execution.arrow.pyspark.enabled' is enabled.
-            if use_arrow:
-                try:
-                    from pyspark.sql.types import 
_check_dataframe_localize_timestamps
-                    import pyarrow
-                    batches = self._collectAsArrow()
-                    if len(batches) > 0:
-                        table = pyarrow.Table.from_batches(batches)
-                        # Pandas DataFrame created from PyArrow uses 
datetime64[ns] for date type
-                        # values, but we should use datetime.date to match the 
behavior with when
-                        # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
-                        return _check_dataframe_localize_timestamps(pdf, 
timezone)
-                    else:
-                        return pd.DataFrame.from_records([], 
columns=self.columns)
-                except Exception as e:
-                    # We might have to allow fallback here as well but 
multiple Spark jobs can
-                    # be executed. So, simply fail in this case for now.
-                    msg = (
-                        "toPandas attempted Arrow optimization because "
-                        "'spark.sql.execution.arrow.pyspark.enabled' is set to 
true, but has "
-                        "reached the error below and can not continue. Note 
that "
-                        "'spark.sql.execution.arrow.pyspark.fallback.enabled' 
does not have an "
-                        "effect on failures in the middle of "
-                        "computation.\n  %s" % _exception_message(e))
-                    warnings.warn(msg)
-                    raise
-
-        # Below is toPandas without Arrow optimization.
-        pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
-
-        dtype = {}
-        for field in self.schema:
-            pandas_type = _to_corrected_pandas_type(field.dataType)
-            # SPARK-21766: if an integer field is nullable and has null 
values, it can be
-            # inferred by pandas as float column. Once we convert the column 
with NaN back
-            # to integer type e.g., np.int16, we will hit exception. So we use 
the inferred
-            # float type, not the corrected type from the schema in this case.
-            if pandas_type is not None and \
-                not(isinstance(field.dataType, IntegralType) and 
field.nullable and
-                    pdf[field.name].isnull().any()):
-                dtype[field.name] = pandas_type
-            # Ensure we fall back to nullable numpy types, even when whole 
column is null:
-            if isinstance(field.dataType, IntegralType) and 
pdf[field.name].isnull().any():
-                dtype[field.name] = np.float64
-            if isinstance(field.dataType, BooleanType) and 
pdf[field.name].isnull().any():
-                dtype[field.name] = np.object
-
-        for f, t in dtype.items():
-            pdf[f] = pdf[f].astype(t, copy=False)
-
-        if timezone is None:
-            return pdf
-        else:
-            from pyspark.sql.types import 
_check_series_convert_timestamps_local_tz
-            for field in self.schema:
-                # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
-                if isinstance(field.dataType, TimestampType):
-                    pdf[field.name] = \
-                        
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
-            return pdf
-
-    def mapInPandas(self, udf):
-        """
-        Maps an iterator of batches in the current :class:`DataFrame` using a 
Pandas user-defined
-        function and returns the result as a :class:`DataFrame`.
-
-        The user-defined function should take an iterator of 
`pandas.DataFrame`\\s and return
-        another iterator of `pandas.DataFrame`\\s. All columns are passed
-        together as an iterator of `pandas.DataFrame`\\s to the user-defined 
function and the
-        returned iterator of `pandas.DataFrame`\\s are combined as a 
:class:`DataFrame`.
-        Each `pandas.DataFrame` size can be controlled by
-        `spark.sql.execution.arrow.maxRecordsPerBatch`.
-        Its schema must match the returnType of the Pandas user-defined 
function.
-
-        :param udf: A function object returned by 
:meth:`pyspark.sql.functions.pandas_udf`
-
-        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-        >>> df = spark.createDataFrame([(1, 21), (2, 30)],
-        ...                            ("id", "age"))  # doctest: +SKIP
-        >>> @pandas_udf(df.schema, PandasUDFType.MAP_ITER)  # doctest: +SKIP
-        ... def filter_func(batch_iter):
-        ...     for pdf in batch_iter:
-        ...         yield pdf[pdf.id == 1]
-        >>> df.mapInPandas(filter_func).show()  # doctest: +SKIP
-        +---+---+
-        | id|age|
-        +---+---+
-        |  1| 21|
-        +---+---+
-
-        .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
-
-        """
-        # Columns are special because hasattr always return True
-        if isinstance(udf, Column) or not hasattr(udf, 'func') \
-                or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
-            raise ValueError("Invalid udf: the udf argument must be a 
pandas_udf of type "
-                             "MAP_ITER.")
-
-        udf_column = udf(*[self[col] for col in self.columns])
-        jdf = self._jdf.mapInPandas(udf_column._jc.expr())
-        return DataFrame(jdf, self.sql_ctx)
-
-    def _collectAsArrow(self):
-        """
-        Returns all records as a list of ArrowRecordBatches, pyarrow must be 
installed
-        and available on driver and worker Python environments.
-
-        .. note:: Experimental.
-        """
-        with SCCallSiteSync(self._sc) as css:
-            port, auth_secret, jsocket_auth_server = 
self._jdf.collectAsArrowToPython()
-
-        # Collect list of un-ordered batches where last element is a list of 
correct order indices
-        try:
-            results = list(_load_from_socket((port, auth_secret), 
ArrowCollectSerializer()))
-        finally:
-            # Join serving thread and raise any exceptions from 
collectAsArrowToPython
-            jsocket_auth_server.getResult()
-
-        # Separate RecordBatches from batch order indices in results
-        batches = results[:-1]
-        batch_order = results[-1]
-
-        # Re-order the batch list using the correct order
-        return [batches[i] for i in batch_order]
-
     
##########################################################################################
     # Pandas compatibility
 
 Review comment:
   Actually .. I would like to deprecate and remove those away. I don't think 
we will add such compatibilities anymore in Spark itself because there are so 
many differences.
   
   Let me take an action separately for the three APIs after this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to