Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146436616
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,52 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, 
preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the 
pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            if not isinstance(schema, StructType) and isinstance(schema, 
DataType):
    +                schema = StructType().add("value", schema)
    --- End diff --
    
    BTW, I think we should not support this case:
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |    1|
    +-----+
    ```
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
    ```
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 595, in 
createDataFrame
        rdd, schema = self._createFromLocal(map(prepare, data), schema)
      File "/.../spark/python/pyspark/sql/session.py", line 399, in 
_createFromLocal
        data = list(data)
      File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
        verify_func(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
        verify_value(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1337, in 
verify_integer
        verify_acceptable_types(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1300, in 
verify_acceptable_types
        % (dataType, obj, type(obj))))
    TypeError: field value: IntegerType can not accept object (1,) in type 
<type 'tuple'>
    ```
    
    I thought disallowing it is actually more consistent with normal Python 
lists:
    
    
    ```python
    >>> spark.createDataFrame([1], "int").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |    1|
    +-----+
    ```
    
    ```python
    >>> spark.createDataFrame([[1]], "int").show()
    ```
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/U.../spark/python/pyspark/sql/session.py", line 595, in 
createDataFrame
        rdd, schema = self._createFromLocal(map(prepare, data), schema)
      File "/.../spark/python/pyspark/sql/session.py", line 399, in 
_createFromLocal
        data = list(data)
      File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
        verify_func(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
        verify_value(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1337, in 
verify_integer
        verify_acceptable_types(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1300, in 
verify_acceptable_types
        % (dataType, obj, type(obj))))
    TypeError: field value: IntegerType can not accept object [1] in type <type 
'list'>
    ```
    
    If we need to support this, I think it should print as below:
    
    ```python
    >>> spark.createDataFrame([[1]], "string").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |  [1]|
    +-----+
    ```
    
    although, I am not sure if we intended to support this:
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "string").show()
    ```
    
    ```
    +--------------------+
    |               value|
    +--------------------+
    |[Ljava.lang.Objec...|
    +--------------------+
    ```
    
    So, my suggestion is, simply falling back in this case, assuming we didn't 
intend to support this case and this usecase looks rare as the existing case 
without Arrow already prints an werid result or throws an error.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to