Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144194084
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", 
"false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // 
self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), 
split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, 
preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from 
context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, 
dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, 
len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can 
delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 
batch
    +                schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    What if a user specify the schema?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to