[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20779 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20779#discussion_r174046867 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -65,7 +65,7 @@ public long durationMs() { /** * Append a row to currentRows. */ - protected void append(InternalRow row) { + public void append(InternalRow row) { --- End diff -- nit: Although we added the test, should we also add a short sentence that said this is public so inner classes can also access it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20779#discussion_r173613316 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -307,4 +309,47 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { // a different query can result in codegen cache miss, that's by design } } + + test("SPARK-23598: Avoid compilation error with a lot of aggregation operations") { +withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) --- End diff -- I tried it, but failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20779#discussion_r173494440 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -307,4 +309,47 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { // a different query can result in codegen cache miss, that's by design } } + + test("SPARK-23598: Avoid compilation error with a lot of aggregation operations") { +withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) --- End diff -- can we do this with a loop? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/20779#discussion_r173484471 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -307,4 +309,47 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { // a different query can result in codegen cache miss, that's by design } } + + test("SPARK-23598: Avoid compilation error with a lot of aggregation operations") { --- End diff -- It's not a compilation but a runtime error. maybe better say: `codegen working for lots of aggregation operations without runtime errors` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/20779 [SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid compilation for a large query ## What changes were proposed in this pull request? This PR fixes compilation error regarding a large query when a generated code has split classes. The issue is `append()`, `stopEarly()`, and other methods are not accessible from split classes that are not subclasses of `BufferedRowIterator`. This PR fixes this issue by making them `public`. Before applying the PR, we see the following exception by running the attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`. ``` test("SPARK-23598") { // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an exception is thrown val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") df_pet_age.groupBy("name").avg("age").show() } ``` Exception: ``` 19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ... ``` Generated code (line 195 calles `stopEarly()`). ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg; /* 010 */ private boolean agg_bufIsNull; /* 011 */ private double agg_bufValue; /* 012 */ private boolean agg_bufIsNull1; /* 013 */ private long agg_bufValue1; /* 014 */ private agg_FastHashMap agg_fastHashMap; /* 015 */ private org.apache.spark.unsafe.KVIteratoragg_fastHashMapIter; /* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 019 */ private scala.collection.Iterator inputadapter_input; /* 020 */ private boolean agg_agg_isNull11; /* 021 */ private boolean agg_agg_isNull25; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2]; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 024 */ private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2]; /* 025 */ /* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ /* 034 */ agg_fastHashMap = new