[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-04-13 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817081#comment-16817081
 ] 

Robert Joseph Evans commented on SPARK-26413:
-

SPARK-27396 covers this, but with a slightly different approach.

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Within that num

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2018-12-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16728135#comment-16728135
 ] 

Hyukjin Kwon commented on SPARK-26413:
--

For clarification, Arrow column vector APIs are exposed under 
[https://github.com/apache/spark/tree/86cc907448f0102ad0c185e87fcc897d0a32707f/sql/core/src/main/java/org/apache/spark/sql/vectorized].
 So it is pretty much feasible for a thirdparty to implement this. For 
instance, the company I belong used this approach to use Arrow format (see also 
https://community.hortonworks.com/articles/223626/integrating-apache-hive-with-apache-spark-hive-war.html).
 It's feasible to integrate with {{ColumnarBatch}}. 

Since RDD APIs are being very conservative, I would like to be very sure if we 
have a strong reason to add RDD APIs. For instance, is it impossible to use the 
API I pointed out? Since Arrow is self-described and structural, it mostly only 
makes sense to use it with SparkSQL within Apahce Spark. In this way, wouldn't 
it more make sense to make the current vector APIs easier to use?

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back 

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2018-12-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16728136#comment-16728136
 ] 

Hyukjin Kwon commented on SPARK-26413:
--

I was thinking if it's better if we expose simply, for instance, some kind of 
utils that converts from Arrow format to Spark internal rows, for instance, 
rather than exposing RDD APIs. If it's something needed. Could this be an 
alternative as well?

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
>

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2018-12-30 Thread Richard Whitcomb (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16731075#comment-16731075
 ] 

Richard Whitcomb commented on SPARK-26413:
--

Yeah the Vector API is useful but can be limiting.  It is restricted to 
selecting the columns you care about prior to calling into the lambda function. 
 Whenever you perform the operation you have to constantly convert back and 
forth between row based and column based formats which can be expensive.  

In the RDD format you can do the block conversion once into an RDD[ArrowTable] 
and do things like shuffle, partition, join etc on that RDD without first 
flattening back into rows without any any additional methods to RDDs.  This 
become harder with a Dataset[ArrowTable] because the entire API works on a row 
based format which isn't the case with RDDs.

Having a Row => ArrowTable in the Dataset API though would be a good first step 
and be fairly easy to implement.  That alone would make things like loading 
into the GPU much easier.

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => ru

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2018-12-31 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16731500#comment-16731500
 ] 

Hyukjin Kwon commented on SPARK-26413:
--

I agree the usage can be limited for now; however, looks thats already what's 
going to happen if we add {{RDD[ArrowTable]}} <-> {{DataFrame}}.
I thought we're going to reuse this code path.

I vaguely assume that the API suggested in the JIRA, and the codes below are 
basically similar:

{code}
def arrowTableToRows(table: Iterator[ArrowTable]): Iterator[Row] {
  val arrowTable = table.next()
  assert !table.hasNext
  // convert arrow table to rows
}

def rowsToArrowTable: Iterator[Row]): Iterator[ArrowTable] {
  // convert rows to arrow table
  Iterator.single(...)
}
{code}

so that you can use:

{code}
val arrowRDD = df.rdd.mapPartitions(rowsToArrowTable)
val df = arrowRDD.mapPartitions(rowsToArrowTable).toDF
{code}

Like:

{code}
val df = spark.range(100).repartition(10)
val rdd = df.rdd.mapPartitions(iter => Iterator.single(iter.toArray))
val df = rdd.mapPartitions(iter => iter.next.iterator).toDF
df.show()
{code}

If the advantage of adding it as an RDD API is mainly to avoid extra RDD 
operations (correct me if I'm mistaken), I doubt if we should add them as an 
RDD APIs, if this can be resolved by adding, for instance, {{arrowTableToRows}} 
and {{rowsToArrowTable}}.


> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RD

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750174#comment-16750174
 ] 

Thomas Graves commented on SPARK-26413:
---

Just a note I think this overlaps with 
https://issues.apache.org/jira/browse/SPARK-24579

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Wi

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750740#comment-16750740
 ] 

Hyukjin Kwon commented on SPARK-26413:
--

I think SPARK-26412 can be resolved together if this one is resolved.

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Within that number the n

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2021-11-08 Thread Ruslan Dautkhanov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440731#comment-17440731
 ] 

Ruslan Dautkhanov commented on SPARK-26413:
---

[https://github.com/apache/spark/pull/34505] is in. 

Part of https://issues.apache.org/jira/browse/SPARK-37227 

 

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.1.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in th