[ https://issues.apache.org/jira/browse/SPARK-26858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766941#comment-16766941 ]
Hyukjin Kwon edited comment on SPARK-26858 at 2/20/19 3:27 AM: --------------------------------------------------------------- cc [~felixcheung], [~shivaram], [~r...@databricks.com], [~bryanc], [~viirya]. [~smilegator], [~falaki] Actually, I wonder if we should add {{gapplyCollect}} support. Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart to explain why it's tricky and hacky to implement. I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here. {code:java} |Driver |Executor |R side |JVM side |JVM side |R side | |1. call `gapplyCollect` | | | | | | | | | |2. DataFrame<R: binary> is | | | | | set for its output schema | | | | | and call `collect` | | | | | | | | | |3. | Query plan is done and it executes | | | | | | | | |4. | | Serialize from JVM row to | | | | | R row (row by row) | | | | | | | |5. | | Send bytes | Recieve bytes | | | | | | |6. | | | Deserializes bytes to an R Data frame | | | | | | |7. | | | Execute R function (with key) | | | | | | |8. | | | Serializes output R DataFrame | | | | | | |9. | | Recieve bytes | Send bytes back | | | | | | |10. | | Wrap it with JVM row Row(Array[Byte]); | | | | | each row is an R DataFrame. | | |11. | Row(Array[Byte]) are collected | | | | | | | | |12. Deserializes each as an | | | | | R Data Frame | | | | {code} The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF. However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution. So, from 8. to 12. above, this way is fine if we use regular {{gapplyCollect}} because each R Data Frame can be ser/de and then combined later. {code:java} do.call("rbind", list(df, df, df ...)) {code} However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically {code:java} | Schema | |-------------| | Arrow Batch | |-------------| | Arrow Batch | |-------------| ... {code} (Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.) This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}. So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization. Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly but looks R API does not have it. If R API has it, the workaround might be easier. There are few ways to work around this problem. 1. Read schema from the first {{Arrow Batch}} and use it. I was trying this way but realised it's pretty hacky and different from what we do in Python side. 2. Map {{df}} to whole Arrow streaming format above. This is different protocol with Python vectorization. 3. Send Arrow batch by Arrow batch without {{Schema}}. This is different protocol with Python vectorization. 4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way). I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization. For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1. 3. Might needs smaller change. So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side. I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it. Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately. I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway. was (Author: hyukjin.kwon): cc [~felixcheung], [~shivaram], [~r...@databricks.com], [~bryanc], [~viirya]. [~smilegator], [~falaki] Actually, I wonder if we should add {{gapplyCollect}} support. Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart to explain why it's tricky and hacky to implement. I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here. {code:java} |Driver |Executor |R side |JVM side |JVM side |R side | |1. call `gapplyCollect` | | | | | | | | | |2. DataFrame<R: binary> is | | | | | set for its output schema | | | | | and call `collect` | | | | | | | | | |3. | Query plan is done and it executes | | | | | | | | |4. | | Serialize from JVM record to | | | | | R recode line by line | | | | | | | |5. | | Send bytes | Recieve bytes | | | | | | |6. | | | Deserializes bytes to an R Data frame | | | | | | |7. | | | Execute R function (with key) | | | | | | |8. | | | Serializes output R DataFrame | | | | | | |9. | | Recieve bytes | Send bytes back | | | | | | |10. | | Wrap it with JVM row Row(Array[Byte]); | | | | | each record is an R DataFrame. | | |11. | Row(Array[Byte]) are collected | | | | | | | | |12. Deserializes each as an | | | | | R Data Frame | | | | {code} The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF. However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution. So, from 8. to 12. above, this way is fine if we use regular {{gapplyCollect}} because each R Data Frame can be ser/de and then combined later. {code:java} do.call("rbind", list(df, df, df ...)) {code} However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically {code:java} | Schema | |-------------| | Arrow Batch | |-------------| | Arrow Batch | |-------------| ... {code} (Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.) This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}. So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization. Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly but looks R API does not have it. If R API has it, the workaround might be easier. There are few ways to work around this problem. 1. Read schema from the first {{Arrow Batch}} and use it. I was trying this way but realised it's pretty hacky and different from what we do in Python side. 2. Map {{df}} to whole Arrow streaming format above. This is different protocol with Python vectorization. 3. Send Arrow batch by Arrow batch without {{Schema}}. This is different protocol with Python vectorization. 4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way). I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization. For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1. 3. Might needs smaller change. So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side. I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it. Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately. I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway. > Vectorized gapplyCollect, Arrow optimization in native R function execution > --------------------------------------------------------------------------- > > Key: SPARK-26858 > URL: https://issues.apache.org/jira/browse/SPARK-26858 > Project: Spark > Issue Type: Sub-task > Components: SparkR, SQL > Affects Versions: 3.0.0 > Reporter: Hyukjin Kwon > Assignee: Hyukjin Kwon > Priority: Major > > Unlike gapply, gapplyCollect requires additional ser/de steps because it can > omit the schema, and Spark SQL doesn't know the return type before actually > execution happens. > In original code path, it's done via using binary schema. Once gapply is done > (SPARK-26761). we can mimic this approach in vectorized gapply to support > gapplyCollect. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org