[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480300#comment-17480300 ] Cheng Su commented on SPARK-18591: -- Just FYI, the Jira should be fixed by https://issues.apache.org/jira/browse/SPARK-37455 . The related code is merged already and should be released in next Spark release - Spark 3.3.0. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro >Priority: Major > Labels: bulk-closed > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476233#comment-17476233 ] F. H. commented on SPARK-18591: --- I just found the same behavior. Even if I directly put ".sort(*group_keys)" before the groupby, Spark is hash-repartitioning. Is there a solution to this? > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro >Priority: Major > Labels: bulk-closed > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279995#comment-17279995 ] Cheng Su commented on SPARK-18591: -- Just came across this Jira. We added the same feature 2 years ago internally, and it is working well and serving ~300 queries per day. I feel even we do the logical-to-physical planning in bottom-up way, there's still more work needed to be done as `outputOrdering` is only valid after `EnsureRequirements` rule. We added this as a physical plan rule post `EnsureRequirements`. Given we already have a bunch of physical plan rules post `EnsureRequirements` - [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L354-L359] . Shall we just promote them into a phase e.g. called physical plan optimization, and we can add this rule as part of optimization phase? cc [~maropu] and [~cloud_fan], thanks. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro >Priority: Major > Labels: bulk-closed > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165148#comment-16165148 ] Maryann Xue commented on SPARK-18591: - Hey, I came up with this idea of doing sort-aggregate on sorted input while debugging some performance issue, and it was before I saw this JIRA. I tried implementing it and found out that I couldn't do it in physical planning because it was top-down, so I ended up doing the same thing as https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55. I totally agree with [~maropu] that we need to make the physical planning bottom-up and then we can solve this in a better way. But aside from this, I found another issue with a slightly more sophisticated query example: {{SELECT t1.a, count\(*\) FROM t1 JOIN t2 ON t1.a = t2.b GROUP BY t1.a}} This would only work if we put the {{ReplaceSortAggregate}} rule after {{EnsureRequirements}} because SortMergeJoinExec's outputOrdering is not correct before EnsureRequirements happens. Please refer to SPARK-21998 for details. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160388#comment-16160388 ] Takeshi Yamamuro commented on SPARK-18591: -- I just kindly give a head-up for the discussion on this thread; since we've already have LogicalPlanVisitor, we might easily realise bottom-up transformation in SparkStrategies like https://github.com/apache/spark/compare/master...maropu:SPARK-18591. I'm not sure this is the good timing now (cuz, probably, I think many committers and qualified developers spending much time on Dataset API v2 reviews and others) to change the transformation way though, I think it'd be better to modify this in future because the bottom-up transformation makes catalyst easily select better physical plans based on bottom sub-tree condition (costs and partition/sort conditions), e.g., we could easily fix SPARK-12978 and this ticket. cc: [~smilegator] > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927409#comment-15927409 ] Takeshi Yamamuro commented on SPARK-18591: -- yea, sure. Have we already filed a JIRA for the planner improvement? > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927388#comment-15927388 ] Wenchen Fan commented on SPARK-18591: - Can we hold it for a while? I think we need to improve the planner first. We should either move back to bottom-up planning, or add a physical optimization phase. We need to figure out a better way to do planning > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925841#comment-15925841 ] Herman van Hovell commented on SPARK-18591: --- [~maropu] I don't think the current planning (of aggregates) is optimal - I think your previous work on merging partial aggregates highlights this. I think we should move back to bottom-up planning. This is also very relevant for the current CBO work. cc [~cloud_fan] We discussed something like this a while back cc [~ueshin] You have implemented the current top-down work. Is there an easy way to move back to bottom up? > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15733855#comment-15733855 ] Takeshi Yamamuro commented on SPARK-18591: -- yea, If we can, it's the best. But IIUC it's difficult to do that in the fist place because spark replaces logical nodes with physical ones in top-down way and we can't check the output ordering of children in between planning. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15733218#comment-15733218 ] Nattavut Sutyanyong commented on SPARK-18591: - [~maropu]: Why do you choose to implement it as an after-the-fact planning, replacing a hash-based with a sort-based plan? Would it be better to pick a sort-based plan at the first place when its input stream satisfies the order requirement of the aggregation? > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15697111#comment-15697111 ] Takeshi Yamamuro commented on SPARK-18591: -- If it's worth trying this, I'll do. I just made a prototype here; https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55 > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org