Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r145200454 --- Diff: python/pyspark/sql/session.py --- @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema + def _createFromPandasWithArrow(self, df, schema): + """ + Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting + to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the + data types will be used to coerce the data in Pandas to Arrow conversion. + """ + import os + from tempfile import NamedTemporaryFile + from pyspark.serializers import ArrowSerializer + from pyspark.sql.types import from_arrow_schema, to_arrow_schema + import pyarrow as pa + + # Slice the DataFrame into batches + step = -(-len(df) // self.sparkContext.defaultParallelism) # round int up + df_slices = (df[start:start + step] for start in xrange(0, len(df), step)) + arrow_schema = to_arrow_schema(schema) if schema is not None else None + batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False) + for df_slice in df_slices] + + # write batches to temp file, read by JVM (borrowed from context.parallelize) + tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + try: + serializer = ArrowSerializer() + serializer.dump_stream(batches, tempFile) + tempFile.close() + readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile + jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches)) + finally: + # readRDDFromFile eagerily reads the file so we can delete right after. + os.unlink(tempFile.name) + + # Create the Spark DataFrame, there will be at least 1 batch + schema = from_arrow_schema(batches[0].schema) --- End diff -- Oh pyarrow 0.4.1 is what is installed on Jenkins, so that is what I've been testing against. Maybe try that version?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org