This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ab850c0 [SPARK-26901][SQL][R] Adds child's output into references to avoid column-pruning for vectorized gapply() ab850c0 is described below commit ab850c02f7f2a2fd7edea99c1279d23a01aeab34 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Feb 20 10:24:40 2019 +0800 [SPARK-26901][SQL][R] Adds child's output into references to avoid column-pruning for vectorized gapply() ## What changes were proposed in this pull request? Currently, looks column pruning is done to vectorized `gapply()`. Given R native function could use all referred fields so it shouldn't be pruned. To avoid this, it adds child's output into `references` like `OutputConsumer`. ``` $ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true ``` ```r df <- createDataFrame(mtcars) explain(count(groupBy(gapply(df, "gear", function(key, group) { data.frame(gear = key[[1]], disp = mean(group$disp)) }, structType("gear double, disp double")))), TRUE) ``` **Before:** ``` == Optimized Logical Plan == Aggregate [count(1) AS count#41L] +- Project +- FlatMapGroupsInRWithArrow [...] +- Project [gear#9] +- LogicalRDD [mpg#0, cyl#1, disp#2, hp#3, drat#4, wt#5, qsec#6, vs#7, am#8, gear#9, carb#10], false == Physical Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#41L]) +- Exchange SinglePartition +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#44L]) +- *(3) Project +- FlatMapGroupsInRWithArrow [...] +- *(2) Sort [gear#9 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(gear#9, 200) +- *(1) Project [gear#9] +- *(1) Scan ExistingRDD arrow[mpg#0,cyl#1,disp#2,hp#3,drat#4,wt#5,qsec#6,vs#7,am#8,gear#9,carb#10] ``` **After:** ``` == Optimized Logical Plan == Aggregate [count(1) AS count#91L] +- Project +- FlatMapGroupsInRWithArrow [...] +- LogicalRDD [mpg#0, cyl#1, disp#2, hp#3, drat#4, wt#5, qsec#6, vs#7, am#8, gear#9, carb#10], false == Physical Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#91L]) +- Exchange SinglePartition +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#94L]) +- *(3) Project +- FlatMapGroupsInRWithArrow [...] +- *(2) Sort [gear#9 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(gear#9, 200) +- *(1) Scan ExistingRDD arrow[mpg#0,cyl#1,disp#2,hp#3,drat#4,wt#5,qsec#6,vs#7,am#8,gear#9,carb#10] ``` Currently, it adds corrupt values for missing columns (via pruned columnar batches to Arrow writers that requires non-pruned columns) such as: ```r ... c(7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 0, 0, 4.17777978645388e-314) c(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1.04669129845114e+219) c(3.4482690635875e-313, 3.4482690635875e-313, 3.4482690635875e-313, c(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2.47032822920623e-323) ... ``` which should be something like: ```r ... c(4, 4, 1, 2, 2, 4, 4, 1, 2, 1, 1, 2) c(26, 30.4, 15.8, 19.7, 15) c(4, 4, 8, 6, 8) c(120.3, 95.1, 351, 145, 301) ... ``` ## How was this patch tested? Manually tested, and unit tests were added. The test code is basiaclly: ```r df <- createDataFrame(mtcars) count(gapply(df, c("gear"), function(key, group) { stopifnot(all(group$hp > 50)) group }, schema(df))) ``` `mtcars`'s hp is all more then 50. ```r > mtcars$hp > 50 [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE [16] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE [31] TRUE TRUE ``` However, due to corrpt value, (like 0 or 7.xxxxx), werid values were found. So, it's currently being failed as below in the master: ``` Error in handleErrors(returnStatus, conn) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 82 in stage 1.0 failed 1 times, most recent failure: Lost task 82.0 in stage 1.0 (TID 198, localhost, executor driver): org.apache.spark.SparkException: R worker exited unexpectedly (crashed) Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE ``` I also compared the total length while I am here. Regular `gapply` without Arrow has some holes .. so I had to compare the results with R data frame. Closes #23810 from HyukjinKwon/SPARK-26901. Lead-authored-by: Hyukjin Kwon <gurwls...@apache.org> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- R/pkg/tests/fulltests/test_sparkSQL.R | 4 ++++ .../scala/org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 ++ 2 files changed, 6 insertions(+) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 9dc699c..4d1360b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -3564,11 +3564,15 @@ test_that("gapply() Arrow optimization", { stopifnot(is.numeric(key[[1]])) } stopifnot(class(grouped) == "data.frame") + stopifnot(length(colnames(grouped)) == 11) + # mtcars' hp is more then 50. + stopifnot(all(grouped$hp > 50)) grouped }, schema(df)) actual <- collect(ret) expect_equal(actual, expected) + expect_equal(count(ret), nrow(mtcars)) }, finally = { # Resetting the conf back to default value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 58bb191..f875af3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -499,6 +499,8 @@ case class FlatMapGroupsInRWithArrow( keyDeserializer: Expression, groupingAttributes: Seq[Attribute], child: LogicalPlan) extends UnaryNode { + // This operator always need all columns of its child, even it doesn't reference to. + override def references: AttributeSet = child.outputSet override protected def stringArgs: Iterator[Any] = Iterator( inputSchema, StructType.fromAttributes(output), keyDeserializer, groupingAttributes, child) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org