[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh edited comment on SPARK-39441 at 2/5/24 11:28 PM: - After applying this fix to 3.3.2, I still see some slowness here with a very tall query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations). The same query plan works fine in 2.4.x Is it safe to skip this analyzer rule? Or another way to speed it up? cc [~cloud_fan] was (Author: masterddt): After applying this fix to 3.3.2, I still see some slowness here with a very tall query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? cc [~cloud_fan] > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh edited comment on SPARK-39441 at 2/5/24 7:01 PM: After applying this fix to 3.3.2, I still see some slowness here with a very tall query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? cc [~cloud_fan] was (Author: masterddt): After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? cc [~cloud_fan] > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh edited comment on SPARK-39441 at 2/5/24 7:00 PM: After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? cc [~cloud_fan] was (Author: masterddt): After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785989#comment-17785989 ] Mitesh edited comment on SPARK-30602 at 11/14/23 7:34 PM: -- Is there a plan to support push-based shuffle on Spark standalone manager also? Or does anybody know why we cant add the merge shuffle manager to `org.apache.spark.deploy.ExternalShuffleService`, then it works for standalone right? was (Author: masterddt): Is there a plan to support push-based shuffle on Spark standalone manager also? > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Assignee: Min Shen >Priority: Major > Labels: release-notes > Fix For: 3.2.0 > > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785989#comment-17785989 ] Mitesh commented on SPARK-30602: Is there a plan to support push-based shuffle on Spark standalone manager also? > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Assignee: Min Shen >Priority: Major > Labels: release-notes > Fix For: 3.2.0 > > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23643) XORShiftRandom.hashSeed allocates unnecessary memory
[ https://issues.apache.org/jira/browse/SPARK-23643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758770#comment-17758770 ] Mitesh edited comment on SPARK-23643 at 8/24/23 11:17 PM: -- +1 can we please document this in the ML migration guide? We use `Dataset.randomSplit()` to come up with test/train sets, so based on this hash change, all of our deterministic ML tests all fail (same input data, same seed/configs, same model...produces diff results). You might even want to consider putting the old behavior back behind a flag, although I understand with a major version upgrade, its acceptable to break behavior. was (Author: masterddt): +1 can we please document this in the ML migration guide? We use `Dataset.randomSplit()` to come up with test/train sets, so based on this hash change, all of our deterministic ML tests all fail (same input data, same seed/configs, same model...produces diff results). You might even want to consider putting the old behavior back behind a flag. > XORShiftRandom.hashSeed allocates unnecessary memory > > > Key: SPARK-23643 > URL: https://issues.apache.org/jira/browse/SPARK-23643 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.0.0 > > > The hashSeed method allocates 64 bytes buffer and puts only 8 bytes of the > seed parameter into it. Other bytes are always zero and could be easily > excluded from hash calculation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23643) XORShiftRandom.hashSeed allocates unnecessary memory
[ https://issues.apache.org/jira/browse/SPARK-23643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758770#comment-17758770 ] Mitesh edited comment on SPARK-23643 at 8/24/23 11:02 PM: -- +1 can we please document this in the ML migration guide? We use `Dataset.randomSplit()` to come up with test/train sets, so based on this hash change, all of our deterministic ML tests all fail (same input data, same seed/configs, same model...produces diff results). You might even want to consider putting the old behavior back behind a flag. was (Author: masterddt): +1 can we please document this in the ML migration guide? We use `Dataset.randomSplit()` to come up with test/train sets, so based on this hash change, all of our deterministic ML tests all fail (same input data, same seed/configs, same model...produces diff results). > XORShiftRandom.hashSeed allocates unnecessary memory > > > Key: SPARK-23643 > URL: https://issues.apache.org/jira/browse/SPARK-23643 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.0.0 > > > The hashSeed method allocates 64 bytes buffer and puts only 8 bytes of the > seed parameter into it. Other bytes are always zero and could be easily > excluded from hash calculation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23643) XORShiftRandom.hashSeed allocates unnecessary memory
[ https://issues.apache.org/jira/browse/SPARK-23643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758770#comment-17758770 ] Mitesh commented on SPARK-23643: +1 can we please document this in the ML migration guide? We use `Dataset.randomSplit()` to come up with test/train sets, so based on this hash change, all of our deterministic ML tests all fail (same input data, same seed/configs, same model...produces diff results). > XORShiftRandom.hashSeed allocates unnecessary memory > > > Key: SPARK-23643 > URL: https://issues.apache.org/jira/browse/SPARK-23643 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.0.0 > > > The hashSeed method allocates 64 bytes buffer and puts only 8 bytes of the > seed parameter into it. Other bytes are always zero and could be easily > excluded from hash calculation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh edited comment on SPARK-39441 at 7/31/23 9:03 PM: - After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? Or another way to speed it up? was (Author: masterddt): After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh edited comment on SPARK-39441 at 7/31/23 7:40 PM: - After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 (search for DeduplicateRelations) Is it safe to skip this analyzer rule? was (Author: masterddt): After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 Is it safe to skip this analyzer rule? > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749321#comment-17749321 ] Mitesh commented on SPARK-39441: After applying this fix to 3.3.2, I still see some slowness here with a very large query tree: https://gist.github.com/MasterDDT/422f933d91f59becf3924f01d03d5456 Is it safe to skip this analyzer rule? > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10970) Executors overload Hive metastore by making massive connections at execution time
[ https://issues.apache.org/jira/browse/SPARK-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575014#comment-17575014 ] Mitesh commented on SPARK-10970: [~cheolsoo] I'm seeing this on Spark 2.4, here is my callstack https://gist.github.com/MasterDDT/ac3b2e73bd4a79226d12ef2c78848537. What is worse for me, my Hive is backed by AWS Glue, so the `reloadFunctions()` calls cause throttling issues. > Executors overload Hive metastore by making massive connections at execution > time > - > > Key: SPARK-10970 > URL: https://issues.apache.org/jira/browse/SPARK-10970 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: Hive 1.2, Spark on YARN >Reporter: Cheolsoo Park >Priority: Critical > > This is a regression in Spark 1.5, more specifically after upgrading Hive > dependency to 1.2. > HIVE-2573 introduced a new feature that allows users to register functions in > session. The problem is that it added a [static code > block|https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L164-L170] > to Hive.java- > {code} > // register all permanent functions. need improvement > static { > try { > reloadFunctions(); > } catch (Exception e) { > LOG.warn("Failed to access metastore. This class should not accessed in > runtime.",e); > } > } > {code} > This code block is executed by every Spark executor in cluster when HadoopRDD > tries to access to JobConf. So if Spark job has a high parallelism (eg > 1000+), executors will hammer the HCat server causing it to go down in the > worst case. > Here is the stack trace that I took in executor when it makes a connection to > Hive metastore- > {code} > 15/10/06 19:26:05 WARN conf.HiveConf: HiveConf of name hive.optimize.s3.query > does not exist > 15/10/06 19:26:05 INFO hive.metastore: XXX: > java.lang.Thread.getStackTrace(Thread.java:1589) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > java.lang.reflect.Constructor.newInstance(Constructor.java:526) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:166) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:803) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.hadoop.hive.ql.plan.PlanUtils.configureInputJobPropertiesForStorageHandler(PlanUtils.java:782) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:347) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.spark.sql.hive.HadoopTableReader$anonfun$17.apply(TableReader.scala:322) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.spark.sql.hive.HadoopTableReader$anonfun$17.apply(TableReader.scala:322) > 15/10/06 19:26:05 INFO hive.metastore: XXX: > org.apache.spark.rdd.HadoopRDD$anonfun$getJobConf$6.apply(HadoopRDD.scala:179) > 15/10/06 1
[jira] [Commented] (SPARK-17636) Parquet predicate pushdown for nested fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072119#comment-17072119 ] Mitesh commented on SPARK-17636: [~cloud_fan] [~dbtsai] thanks for fixing! Is there any plan to backport to 2.4x branch? > Parquet predicate pushdown for nested fields > > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3, 2.0.2 >Reporter: Mitesh >Assignee: DB Tsai >Priority: Minor > Fix For: 3.0.0 > > > There's a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {noformat} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819270#comment-16819270 ] Mitesh edited comment on SPARK-11033 at 4/16/19 4:59 PM: - Thanks [~vanzin]! One quick question...is getting this working just a matter of passing the real host of the launcher process to the driver process, instead of using the hardcoded localhost? https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala#L48 Because this works in client mode, so I was very confused why it fails for cluster mode, until I saw the hardcoded localhost. Everything else should work fine? was (Author: masterddt): Thanks [~vanzin]! One quick question...is getting this working just a matter of passing the real host of the launcher process to the driver process, instead of using the hardcoded localhost? https://github.com/apache/spark/blob/2.3/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala#L48 Because this works in client mode, so I was very confused why it fails for cluster mode, until I saw the hardcoded localhost. Everything else should work fine? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819270#comment-16819270 ] Mitesh commented on SPARK-11033: Thanks [~vanzin]! One quick question...is getting this working just a matter of passing the real host of the launcher process to the driver process, instead of using the hardcoded localhost? https://github.com/apache/spark/blob/2.3/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala#L48 Because this works in client mode, so I was very confused why it fails for cluster mode, until I saw the hardcoded localhost. Everything else should work fine? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819178#comment-16819178 ] Mitesh edited comment on SPARK-11033 at 4/16/19 4:00 PM: - [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like SparkAppHandle.Listener is not supported yet. Do you have a suggestion on how I can monitor the app (for success, failure, and "it disappeared" cases)? was (Author: masterddt): [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like SparkAppHandle.Listener is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819178#comment-16819178 ] Mitesh edited comment on SPARK-11033 at 4/16/19 3:58 PM: - [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like {SparkAppHandle.Listener} is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? was (Author: masterddt): [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like `SparkAppHandle.Listener` is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819178#comment-16819178 ] Mitesh edited comment on SPARK-11033 at 4/16/19 3:58 PM: - [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like SparkAppHandle.Listener is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? was (Author: masterddt): [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like {SparkAppHandle.Listener} is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11033) Launcher: add support for monitoring standalone/cluster apps
[ https://issues.apache.org/jira/browse/SPARK-11033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819178#comment-16819178 ] Mitesh commented on SPARK-11033: [~vanzin] Is there any plan to get this working? I'm on 2.3.2 using standalone scheduler, clustermode, and InProcessLauncher. We need a way to to monitor the launched app, and sounds like `SparkAppHandle.Listener` is not supported yet. Do you have a suggestion on how I can monitor the app (both for success, failure, and "it disappeared" cases)? > Launcher: add support for monitoring standalone/cluster apps > > > Key: SPARK-11033 > URL: https://issues.apache.org/jira/browse/SPARK-11033 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Marcelo Vanzin >Priority: Major > > The backend for app monitoring in the launcher library was added in > SPARK-8673, but the code currently does not support standalone cluster mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761207#comment-16761207 ] Mitesh edited comment on SPARK-19468 at 2/6/19 7:13 PM: I'm seeing the same behavior in 2.3.2 [~viirya]. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. was (Author: masterddt): +1 I'm seeing the same behavior in 2.3.2 [~viirya]. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (
[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761207#comment-16761207 ] Mitesh edited comment on SPARK-19468 at 2/6/19 7:13 PM: +1 I'm seeing the same behavior in 2.3.2 [~viirya]. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. was (Author: masterddt): +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS >
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761535#comment-16761535 ] Mitesh edited comment on SPARK-19981 at 2/6/19 7:12 PM: Ping [~maropu] any updates here? This still is an issue in 2.3.2. Also maybe a dupe of SPARK-19468 was (Author: masterddt): Ping any updates here? This still is an issue in 2.3.2. Also maybe a dupe of SPARK-19468 > Sort-Merge join inserts shuffles when joining dataframes with aliased columns > - > > Key: SPARK-19981 > URL: https://issues.apache.org/jira/browse/SPARK-19981 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Allen George >Priority: Major > > Performing a sort-merge join with two dataframes - each of which has the join > column aliased - causes Spark to insert an unnecessary shuffle. > Consider the scala test code below, which should be equivalent to the > following SQL. > {code:SQL} > SELECT * FROM > (SELECT number AS aliased from df1) t1 > LEFT JOIN > (SELECT number AS aliased from df2) t2 > ON t1.aliased = t2.aliased > {code} > {code:scala} > private case class OneItem(number: Long) > private case class TwoItem(number: Long, value: String) > test("join with aliases should not trigger shuffle") { > val df1 = sqlContext.createDataFrame( > Seq( > OneItem(0), > OneItem(2), > OneItem(4) > ) > ) > val partitionedDf1 = df1.repartition(10, col("number")) > partitionedDf1.createOrReplaceTempView("df1") > partitionedDf1.cache() partitionedDf1.count() > > val df2 = sqlContext.createDataFrame( > Seq( > TwoItem(0, "zero"), > TwoItem(2, "two"), > TwoItem(4, "four") > ) > ) > val partitionedDf2 = df2.repartition(10, col("number")) > partitionedDf2.createOrReplaceTempView("df2") > partitionedDf2.cache() partitionedDf2.count() > > val fromDf1 = sqlContext.sql("SELECT number from df1") > val fromDf2 = sqlContext.sql("SELECT number from df2") > val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") > val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") > aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } > {code} > Both the SQL and the Scala code generate a query-plan where an extra exchange > is inserted before performing the sort-merge join. This exchange changes the > partitioning from {{HashPartitioning("number", 10)}} for each frame being > joined into {{HashPartitioning("aliased", 5)}}. I would have expected that > since it's a simple column aliasing, and both frames have exactly the same > partitioning that the initial frames. > {noformat} > *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, > hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)] > +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], > Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, > 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, > aliased#270:bigint%NONNULL)] >:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- Exchange [args=hashpartitioning(aliased#267L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- *Project [args=[number#198L AS > aliased#267L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >:+- InMemoryTableScan > [args=[number#198L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)] >: : +- InMemoryRelation [number#198L], true, 1, > StorageLevel(disk, memory, deserialized, 1 replicas), > false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)] >: : : +- Exchange [args=hashpartitioning(number#198L, > 10)%NONNULL][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >: : : +- LocalTableScan > [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L > ASC%NONNULL)][output=ArrayBuffer(a
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761535#comment-16761535 ] Mitesh edited comment on SPARK-19981 at 2/6/19 6:54 AM: Ping any updates here? This still is an issue in 2.3.2. Also maybe a dupe of SPARK-19468 was (Author: masterddt): Ping any updates here? This still is an issue in 2.3.2. > Sort-Merge join inserts shuffles when joining dataframes with aliased columns > - > > Key: SPARK-19981 > URL: https://issues.apache.org/jira/browse/SPARK-19981 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Allen George >Priority: Major > > Performing a sort-merge join with two dataframes - each of which has the join > column aliased - causes Spark to insert an unnecessary shuffle. > Consider the scala test code below, which should be equivalent to the > following SQL. > {code:SQL} > SELECT * FROM > (SELECT number AS aliased from df1) t1 > LEFT JOIN > (SELECT number AS aliased from df2) t2 > ON t1.aliased = t2.aliased > {code} > {code:scala} > private case class OneItem(number: Long) > private case class TwoItem(number: Long, value: String) > test("join with aliases should not trigger shuffle") { > val df1 = sqlContext.createDataFrame( > Seq( > OneItem(0), > OneItem(2), > OneItem(4) > ) > ) > val partitionedDf1 = df1.repartition(10, col("number")) > partitionedDf1.createOrReplaceTempView("df1") > partitionedDf1.cache() partitionedDf1.count() > > val df2 = sqlContext.createDataFrame( > Seq( > TwoItem(0, "zero"), > TwoItem(2, "two"), > TwoItem(4, "four") > ) > ) > val partitionedDf2 = df2.repartition(10, col("number")) > partitionedDf2.createOrReplaceTempView("df2") > partitionedDf2.cache() partitionedDf2.count() > > val fromDf1 = sqlContext.sql("SELECT number from df1") > val fromDf2 = sqlContext.sql("SELECT number from df2") > val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") > val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") > aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } > {code} > Both the SQL and the Scala code generate a query-plan where an extra exchange > is inserted before performing the sort-merge join. This exchange changes the > partitioning from {{HashPartitioning("number", 10)}} for each frame being > joined into {{HashPartitioning("aliased", 5)}}. I would have expected that > since it's a simple column aliasing, and both frames have exactly the same > partitioning that the initial frames. > {noformat} > *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, > hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)] > +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], > Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, > 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, > aliased#270:bigint%NONNULL)] >:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- Exchange [args=hashpartitioning(aliased#267L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- *Project [args=[number#198L AS > aliased#267L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >:+- InMemoryTableScan > [args=[number#198L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)] >: : +- InMemoryRelation [number#198L], true, 1, > StorageLevel(disk, memory, deserialized, 1 replicas), > false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)] >: : : +- Exchange [args=hashpartitioning(number#198L, > 10)%NONNULL][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >: : : +- LocalTableScan > [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L > ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)] > +- Exchan
[jira] [Commented] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761535#comment-16761535 ] Mitesh commented on SPARK-19981: Ping any updates here? This still is an issue in 2.3.2. > Sort-Merge join inserts shuffles when joining dataframes with aliased columns > - > > Key: SPARK-19981 > URL: https://issues.apache.org/jira/browse/SPARK-19981 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Allen George >Priority: Major > > Performing a sort-merge join with two dataframes - each of which has the join > column aliased - causes Spark to insert an unnecessary shuffle. > Consider the scala test code below, which should be equivalent to the > following SQL. > {code:SQL} > SELECT * FROM > (SELECT number AS aliased from df1) t1 > LEFT JOIN > (SELECT number AS aliased from df2) t2 > ON t1.aliased = t2.aliased > {code} > {code:scala} > private case class OneItem(number: Long) > private case class TwoItem(number: Long, value: String) > test("join with aliases should not trigger shuffle") { > val df1 = sqlContext.createDataFrame( > Seq( > OneItem(0), > OneItem(2), > OneItem(4) > ) > ) > val partitionedDf1 = df1.repartition(10, col("number")) > partitionedDf1.createOrReplaceTempView("df1") > partitionedDf1.cache() partitionedDf1.count() > > val df2 = sqlContext.createDataFrame( > Seq( > TwoItem(0, "zero"), > TwoItem(2, "two"), > TwoItem(4, "four") > ) > ) > val partitionedDf2 = df2.repartition(10, col("number")) > partitionedDf2.createOrReplaceTempView("df2") > partitionedDf2.cache() partitionedDf2.count() > > val fromDf1 = sqlContext.sql("SELECT number from df1") > val fromDf2 = sqlContext.sql("SELECT number from df2") > val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") > val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") > aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } > {code} > Both the SQL and the Scala code generate a query-plan where an extra exchange > is inserted before performing the sort-merge join. This exchange changes the > partitioning from {{HashPartitioning("number", 10)}} for each frame being > joined into {{HashPartitioning("aliased", 5)}}. I would have expected that > since it's a simple column aliasing, and both frames have exactly the same > partitioning that the initial frames. > {noformat} > *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, > hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)] > +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], > Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, > 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, > aliased#270:bigint%NONNULL)] >:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- Exchange [args=hashpartitioning(aliased#267L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >: +- *Project [args=[number#198L AS > aliased#267L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] >:+- InMemoryTableScan > [args=[number#198L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)] >: : +- InMemoryRelation [number#198L], true, 1, > StorageLevel(disk, memory, deserialized, 1 replicas), > false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)] >: : : +- Exchange [args=hashpartitioning(number#198L, > 10)%NONNULL][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >: : : +- LocalTableScan > [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)] >+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L > ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)] > +- Exchange [args=hashpartitioning(aliased#270L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bi
[jira] [Commented] (SPARK-17636) Parquet predicate pushdown for nested fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761311#comment-16761311 ] Mitesh commented on SPARK-17636: Should this be closed, as a duplicate of SPARK-4502? > Parquet predicate pushdown for nested fields > > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3, 2.0.2 >Reporter: Mitesh >Assignee: DB Tsai >Priority: Minor > Fix For: 3.0.0 > > > There's a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {noformat} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761309#comment-16761309 ] Mitesh commented on SPARK-19468: Also this may be a dupe of SPARK-19981 > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#83], [_1#100], Inner > :- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_1#100
[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761262#comment-16761262 ] Mitesh commented on SPARK-19468: Also curious why in the fix for SPARK-19931, it was only fixed for HashPartitioning instead of any kind of partitioning that extends Expression. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#83], [_1#100], Inner > :- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- E
[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761207#comment-16761207 ] Mitesh edited comment on SPARK-19468 at 2/5/19 8:59 PM: +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. was (Author: masterddt): +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the `outputPartitioning`/`outputOrdering`. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still no
[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761207#comment-16761207 ] Mitesh edited comment on SPARK-19468 at 2/5/19 8:59 PM: +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. was (Author: masterddt): +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the outputPartitioning/outputOrdering. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0,
[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles
[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761207#comment-16761207 ] Mitesh commented on SPARK-19468: +1 I'm seeing the same behavior. It seems like any physical operator that changes output (ProjectExec.output, SortAggregateExec.output, ...) needs the same fix from https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is aliasing anything, that is reflected in the attributes inside the `outputPartitioning`/`outputOrdering`. > Dataset slow because of unnecessary shuffles > > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: koert kuipers >Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > :+- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 1, > StorageLevel(disk, 1 replicas) > :+- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 >+- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] >+- InMemoryRelation [_1#100, _2#101], true, 1, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) >+- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#83], [_1#100], Inner
[jira] [Comment Edited] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099030#comment-16099030 ] Mitesh edited comment on SPARK-20112 at 7/24/17 7:37 PM: - Still seeing this on 2.1.0, new err file https://gist.github.com/MasterDDT/7eab5e8828ef9cbed2b8956a505c was (Author: masterddt): Still seeing this on 2.1.0, attached new err file > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables. And > we partition/sort them all beforehand so its always sort-merge-joins or > broadcast joins (with small tables). > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} > This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, > but that is marked fix in 2.0.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099030#comment-16099030 ] Mitesh commented on SPARK-20112: Still seeing this on 2.1.0, attached new err file > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables. And > we partition/sort them all beforehand so its always sort-merge-joins or > broadcast joins (with small tables). > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} > This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, > but that is marked fix in 2.0.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016021#comment-16016021 ] Mitesh commented on SPARK-17867: Ah I see, thanks [~viirya]. The repartitionByColumns is just a short-cut method I created. But I do have some aliasing code changes compared to 2.1, I will try to remove those and see if that is whats breaking it. > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh edited comment on SPARK-17867 at 5/18/17 1:48 PM: - I'm seeing a regression from this change, the last `del <> 'hi'` filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} was (Author: masterddt): I'm seeing a regression from this change, the last {del <> 'hi'} filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh edited comment on SPARK-17867 at 5/18/17 1:48 PM: - I'm seeing a regression from this change, the last "del <> 'hi'" filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} was (Author: masterddt): I'm seeing a regression from this change, the last `del <> 'hi'` filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh edited comment on SPARK-17867 at 5/18/17 1:48 PM: - I'm seeing a regression from this change, the last {del <> 'hi'} filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} was (Author: masterddt): I'm seeing a regression from this change, the last filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh edited comment on SPARK-17867 at 5/18/17 1:49 PM: - I'm seeing a regression from this change, the last "del <> 'hi'" filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} was (Author: masterddt): I'm seeing a regression from this change, the last "del <> 'hi'" filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh commented on SPARK-17867: I'm seeing a regression from this change, the last filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:scala} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
[ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015772#comment-16015772 ] Mitesh edited comment on SPARK-17867 at 5/18/17 1:47 PM: - I'm seeing a regression from this change, the last filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:none} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} was (Author: masterddt): I'm seeing a regression from this change, the last filter gets pushed down past the dropDuplicates aggregation. cc [~cloud_fan] {code:scala} val df = Seq((1,2,3,"hi"), (1,2,4,"hi")) .toDF("userid", "eventid", "vk", "del") .filter("userid is not null and eventid is not null and vk is not null") .repartitionByColumns(Seq("userid")) .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk")) .dropDuplicates("eventid") .filter("userid is not null") .repartitionByColumns(Seq("userid")). sortWithinPartitions(asc("userid")) .filter("del <> 'hi'") // filter should not be pushed down to the local table scan df.queryExecution.sparkPlan.collect { case f @ FilterExec(_, t @ LocalTableScanExec(_, _)) => assert(false, s"$f was pushed down to $t") {code} > Dataset.dropDuplicates (i.e. distinct) should consider the columns with same > column name > > > Key: SPARK-17867 > URL: https://issues.apache.org/jira/browse/SPARK-17867 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh > Fix For: 2.1.0 > > > We find and get the first resolved attribute from output with the given > column name in Dataset.dropDuplicates. When we have the more than one columns > with the same name. Other columns are put into aggregation columns, instead > of grouping columns. We should fix this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945399#comment-15945399 ] Mitesh edited comment on SPARK-20112 at 3/28/17 3:46 PM: - [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10MB, but a few are 200MB. The non-hashed cached tables are 1-100G {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} was (Author: masterddt): [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10MB, but a few are 200MB. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a
[jira] [Comment Edited] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945399#comment-15945399 ] Mitesh edited comment on SPARK-20112 at 3/28/17 3:46 PM: - [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10MB, but a few are 200MB. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} was (Author: masterddt): [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10M, but a few are 200M. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big
[jira] [Comment Edited] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945399#comment-15945399 ] Mitesh edited comment on SPARK-20112 at 3/28/17 3:40 PM: - [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10M, but a few are 200M. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} was (Author: masterddt): [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10M, but a few are 200M. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big qu
[jira] [Commented] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945399#comment-15945399 ] Mitesh commented on SPARK-20112: [~kiszk] I can try out spark 2.0.3+ or 2.1. Actually I disabled wholestage codegen and I do see a failure still on 2.0.2, but in a different place now in {{HashJoin.advanceNext}}. Also uploaded the new hs_err_pid22870. The hashed relations are around 1-10M, but a few are 200M. {noformat} 17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 KB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 SIGSEGV17/03/27 22:15:59 DEBUG [Executor task launch worker-17] TaskMemoryManager: Task 152119 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b60e395 [thread 140369911781120 also had an error] (0xb) at pc=0x7fad1f7afc11, pid=22870, tid=140369909675776 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) # Problematic frame: # J 25558 C2 org.apache.spark.sql.execution.joins.HashJoin$$anonfun$outerJoin$1$$anon$1.advanceNext()Z (110 bytes) @ 0x7fad1f7afc11 [0x7fad1f7afb20+0xf1] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /mnt/xvdb/spark/worker_dir/app-20170327213416-0005/14/hs_err_pid22870.log 17/03/27 22:15:59 DEBUG [Executor task launch worker-19] TaskMemoryManager: Task 152090 acquired 64.0 MB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@51de5289 2502.591: [G1Ergonomics (Concurrent Cycles) request concurrent cycle initiation, reason: occupancy higher than threshold, occupancy: 7667187712 bytes, allocation request: 160640 bytes, threshold: 8214124950 bytes (45.00 %), source: concurrent humongous allocation] [thread 140376087648000 also had an error] [thread 140369903376128 also had an error] # {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables. And > we partition/sort them all beforehand so its always sort-merge-joins or > broadcast joins (with small tables). > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} > This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, > but that is marked fix in 2.0.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: hs_err_pid22870.log > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log, > hs_err_pid22870.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables. And > we partition/sort them all beforehand so its always sort-merge-joins or > broadcast joins (with small tables). > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} > This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, > but that is marked fix in 2.0.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 3:21 PM: - As I mentioned on the PR, maybe its better to fix it here? https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, maybe a better to fix it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#3
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 1:35 PM: - As I mentioned on the PR, maybe a better to fix it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, maybe a better to fix it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#3724
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM: - As I mentioned on the PR, maybe a better way is to handle it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM: - As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {[newA}] is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#372
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM: - As I mentioned on the PR, maybe a better to fix it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, maybe a better way is to handle it here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {{newA}} is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL
[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh edited comment on SPARK-19981 at 3/28/17 1:33 PM: - As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {[newA}] is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so `a#1` is the same as `a#1 as newA#2`. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on `newA` is unnecessary. ``` scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NO
[jira] [Commented] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns
[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ] Mitesh commented on SPARK-19981: As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so `a#1` is the same as `a#1 as newA#2`. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on `newA` is unnecessary. ``` scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]``` > Sort-Merge join inserts shuffles when joining dataframes with aliased columns > - > > Key: SPARK-19981 > URL: https://issues.apache.org/jira/browse/SPARK-19981 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Allen George > > Performing a sort-merge join with two dataframes - each of which has the join > column aliased - causes Spark to insert an unnecessary shuffle. > Consider the scala test code below, which should be equivalent to the > following SQL. > {code:SQL} > SELECT * FROM > (SELECT number AS aliased from df1) t1 > LEFT JOIN > (SELECT number AS aliased from df2) t2 > ON t1.aliased = t2.aliased > {code} > {code:scala} > private case class OneItem(number: Long) > private case class TwoItem(number: Long, value: String) > test("join with aliases should not trigger shuffle") { > val df1 = sqlContext.createDataFrame( > Seq( > OneItem(0), > OneItem(2), > OneItem(4) > ) > ) > val partitionedDf1 = df1.repartition(10, col("number")) > partitionedDf1.createOrReplaceTempView("df1") > partitionedDf1.cache() partitionedDf1.count() > > val df2 = sqlContext.createDataFrame( > Seq( > TwoItem(0, "zero"), > TwoItem(2, "two"), > TwoItem(4, "four") > ) > ) > val partitionedDf2 = df2.repartition(10, col("number")) > partitionedDf2.createOrReplaceTempView("df2") > partitionedDf2.cache() partitionedDf2.count() > > val fromDf1 = sqlContext.sql("SELECT number from df1") > val fromDf2 = sqlContext.sql("SELECT number from df2") > val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") > val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") > aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } > {code} > Both the SQL and the Scala code generate a query-plan where an extra exchange > is inserted before performing the sort-merge join. This exchange changes the > partitioning from {{HashPartitioning("number", 10)}} for each frame being > joined into {{HashPartitioning("aliased", 5)}}. I would have expected that > since it's a simple column aliasing, and both frames have exactly the same > partitioning that the initial frames. > {noformat} > *Project [ar
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Description: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables. And we partition/sort them all beforehand so its always sort-merge-joins or broadcast joins (with small tables). {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, but that is marked fix in 2.0.0 was: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, but that is marked fix in 2.0.0 > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables. And > we partition/sort them all beforehand so its always sort-merge-joins or > broadcast joins (with small tables). > {noformat} > # A fatal error has been detected by the Java Runtime Envir
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Description: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, but that is marked fix in 2.0.0 was: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 13987234800
[jira] [Issue Comment Deleted] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Comment: was deleted (was: This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, but that is marked fix in 2.0.0) > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944100#comment-15944100 ] Mitesh commented on SPARK-20112: This kind of looks like https://issues.apache.org/jira/browse/SPARK-15822, but that is marked fix in 2.0.0 > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: codegen_sorter_crash.log > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: (was: codegen_sorter_crash.log) > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Description: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} was: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using r3.8xlarge machines, which have ephermal attached drives, I can't repro this. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using AWS r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. But it does repro with m4.10xlarge with an io1 EBS drive. So I > think that means its not an issue with the code-gen, but I cant figure out > what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/sp
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Description: I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using r3.8xlarge machines, which have ephermal attached drives, I can't repro this. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} was: I'm seeing a very weird crash in GeneratedIterator.sort_addToSorter. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using r3.8xlarge machines, which have ephermal attached drives, I can't repro this. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in {{GeneratedIterator.sort_addToSorter}}. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. So I think that means its not an issue with the code-gen, but I > cant figure out what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: (was: codegen_sorter_crash) > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in GeneratedIterator.sort_addToSorter. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. So I think that means its not an issue with the code-gen, but I > cant figure out what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: codegen_sorter_crash.log > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash.log, hs_err_pid19271.log > > > I'm seeing a very weird crash in GeneratedIterator.sort_addToSorter. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. So I think that means its not an issue with the code-gen, but I > cant figure out what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
[ https://issues.apache.org/jira/browse/SPARK-20112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-20112: --- Attachment: hs_err_pid19271.log codegen_sorter_crash > SIGSEGV in GeneratedIterator.sort_addToSorter > - > > Key: SPARK-20112 > URL: https://issues.apache.org/jira/browse/SPARK-20112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) >Reporter: Mitesh > Attachments: codegen_sorter_crash, hs_err_pid19271.log > > > I'm seeing a very weird crash in GeneratedIterator.sort_addToSorter. The > hs_err_pid and codegen file are attached (with query plans). Its not a > deterministic repro, but running a big query load, I eventually see it come > up within a few minutes. > Here is some interesting repro information: > - Using r3.8xlarge machines, which have ephermal attached drives, I can't > repro this. So I think that means its not an issue with the code-gen, but I > cant figure out what the difference in behavior is. > - The broadcast joins in the plan are all small tables. I have > autoJoinBroadcast=-1 because I always hint which tables should be broadcast. > - As you can see from the plan, all the sources are cached memory tables > {noformat} > # A fatal error has been detected by the Java Runtime Environment: > # > # [thread 139872345896704 also had an error] > SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build > 1.8.0_60-b27) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode > linux-amd64 compressed oops) > [thread 139872348002048 also had an error]# Problematic frame: > # > J 28454 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V > (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20112) SIGSEGV in GeneratedIterator.sort_addToSorter
Mitesh created SPARK-20112: -- Summary: SIGSEGV in GeneratedIterator.sort_addToSorter Key: SPARK-20112 URL: https://issues.apache.org/jira/browse/SPARK-20112 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.2 Environment: AWS m4.10xlarge with EBS (io1 drive, 400g, 4000iops) Reporter: Mitesh I'm seeing a very weird crash in GeneratedIterator.sort_addToSorter. The hs_err_pid and codegen file are attached (with query plans). Its not a deterministic repro, but running a big query load, I eventually see it come up within a few minutes. Here is some interesting repro information: - Using r3.8xlarge machines, which have ephermal attached drives, I can't repro this. So I think that means its not an issue with the code-gen, but I cant figure out what the difference in behavior is. - The broadcast joins in the plan are all small tables. I have autoJoinBroadcast=-1 because I always hint which tables should be broadcast. - As you can see from the plan, all the sources are cached memory tables {noformat} # A fatal error has been detected by the Java Runtime Environment: # # [thread 139872345896704 also had an error] SIGSEGV (0xb) at pc=0x7f38a378caa3, pid=19271, tid=139872342738688 # # JRE version: Java(TM) SE Runtime Environment (8.0_60-b27) (build 1.8.0_60-b27) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode linux-amd64 compressed oops) [thread 139872348002048 also had an error]# Problematic frame: # J 28454 C1 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V (369 bytes) @ 0x7f38a378caa3 [0x7f38a378b5e0+0x14c3] {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Affects Version/s: 2.0.2 > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3, 2.0.2 >Reporter: Mitesh >Priority: Minor > > There's a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {noformat} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15562757#comment-15562757 ] Mitesh commented on SPARK-17636: [~liancheng] Could you or someone else familiar with the code comment on if this is expected case (just not implemented), or is there a bug here? > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3 >Reporter: Mitesh >Priority: Minor > > Theres a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Affects Version/s: 1.6.3 > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3 >Reporter: Mitesh >Priority: Minor > > Theres a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Priority: Minor (was: Major) > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Minor > > Theres a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Description: Theres a *PushedFilters* for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file {quote} was: Theres a `PushedFilters` for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file {quote} > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh > > Theres a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Description: Theres a `PushedFilters` for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file {quote} was: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file {quote} > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh > > Theres a `PushedFilters` for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Description: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3a://some/parquet/file {quote} was: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 {quote} > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh > > The filter gets pushed down for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Description: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 {quote} was: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 {quote} > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh > > The filter gets pushed down for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, > PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-17636: --- Description: The filter gets pushed down for a simple numeric field, but not for a numeric field inside a struct. Not sure if this is a Spark limitation because of Parquet, or only a Spark limitation. {quote} scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 {quote} was: scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Mitesh > > The filter gets pushed down for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {quote} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, > PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17636) Parquet filter push down doesn't handle struct fields
Mitesh created SPARK-17636: -- Summary: Parquet filter push down doesn't handle struct fields Key: SPARK-17636 URL: https://issues.apache.org/jira/browse/SPARK-17636 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 1.6.2 Reporter: Mitesh scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", "sale_id") res5: org.apache.spark.sql.DataFrame = [day_timestamp: struct, sale_id: bigint] scala> res5.filter("sale_id > 4").queryExecution.executedPlan res9: org.apache.spark.sql.execution.SparkPlan = Filter[23814] [args=(sale_id#86324L > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933, PushedFilters: [GreaterThan(sale_id,4)] scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan res10: org.apache.spark.sql.execution.SparkPlan = Filter[23815] [args=(day_timestamp#86302.timestamp > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: s3n://aiq.data.pipeline.prod-restart/gilt/fact_clickstream_page_views/annotated/20150917_123800634_zzz_937571e7_0161_48e4_8c8c_7ce5600ae933 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15390833#comment-15390833 ] Mitesh commented on SPARK-13979: Yeah that sounds like it. We worked around it by adding AWS_ACCESS_KEY_ID to the driver's env variable, but that isn't a great workaround for when you have 1 app that may want to spin up different sparkContexts with different keys. > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366459#comment-15366459 ] Mitesh commented on SPARK-13979: Just to be clear, you dont kill the worker process, you kill the executor process `CoarseGrainedExecutorBackend`. You killed that process and then master brought another one up, and then that process can successfully read from S3 bucket with the original keys given by driver? > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366459#comment-15366459 ] Mitesh edited comment on SPARK-13979 at 7/7/16 5:36 PM: Just to be clear, you dont kill the worker process, you kill the executor process {{CoarseGrainedExecutorBackend}}. You killed that process and then master brought another one up, and then that process can successfully read from S3 bucket with the original keys given by driver? was (Author: masterddt): Just to be clear, you dont kill the worker process, you kill the executor process `CoarseGrainedExecutorBackend`. You killed that process and then master brought another one up, and then that process can successfully read from S3 bucket with the original keys given by driver? > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16419) EnsureRequirements adds extra Sort to already sorted cached table
Mitesh created SPARK-16419: -- Summary: EnsureRequirements adds extra Sort to already sorted cached table Key: SPARK-16419 URL: https://issues.apache.org/jira/browse/SPARK-16419 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.2 Reporter: Mitesh Priority: Minor EnsureRequirements compared the required and given sort ordering, but uses Scala equals instead of a semantic equals, so column capitalization isn't considered. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16419) EnsureRequirements adds extra Sort to already sorted cached table
[ https://issues.apache.org/jira/browse/SPARK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-16419: --- Description: EnsureRequirements compares the required and given sort ordering, but uses Scala equals instead of a semantic equals, so column capitalization isn't considered, and also fails for a cached table. This results in a SortMergeJoin of a cached already-sorted table to add an extra sort. (was: EnsureRequirements compared the required and given sort ordering, but uses Scala equals instead of a semantic equals, so column capitalization isn't considered.) > EnsureRequirements adds extra Sort to already sorted cached table > - > > Key: SPARK-16419 > URL: https://issues.apache.org/jira/browse/SPARK-16419 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Minor > > EnsureRequirements compares the required and given sort ordering, but uses > Scala equals instead of a semantic equals, so column capitalization isn't > considered, and also fails for a cached table. This results in a > SortMergeJoin of a cached already-sorted table to add an extra sort. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16398) Make cancelJob and cancelStage API public
[ https://issues.apache.org/jira/browse/SPARK-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-16398: --- Description: Make the SparkContext {{cancelJob}} and {{cancelStage}} APIs public. This allows applications to use {{SparkListener}} to do their own management of jobs via events, but without using the REST API. (was: Make the SparkContext {{cancelJob}} and {{cancelStage}} APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API.) > Make cancelJob and cancelStage API public > - > > Key: SPARK-16398 > URL: https://issues.apache.org/jira/browse/SPARK-16398 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Trivial > > Make the SparkContext {{cancelJob}} and {{cancelStage}} APIs public. This > allows applications to use {{SparkListener}} to do their own management of > jobs via events, but without using the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16398) Make cancelJob and cancelStage API public
[ https://issues.apache.org/jira/browse/SPARK-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-16398: --- Description: Make the SparkContext {{cancelJob}} and {{cancelStage}} APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API. (was: Make the {{SparkContext}} {{cancelJob}} and {{cancelStage}} APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API.) > Make cancelJob and cancelStage API public > - > > Key: SPARK-16398 > URL: https://issues.apache.org/jira/browse/SPARK-16398 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Trivial > > Make the SparkContext {{cancelJob}} and {{cancelStage}} APIs public. This > allows applications to use `SparkListener` to do their own management of jobs > via events, but without using the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16398) Make cancelJob and cancelStage API public
[ https://issues.apache.org/jira/browse/SPARK-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-16398: --- Description: Make the {{SparkContext}} {{cancelJob}} and {{cancelStage}} APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API. (was: Make the `SparkContext` `cancelJob` and `cancelStage` APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API.) > Make cancelJob and cancelStage API public > - > > Key: SPARK-16398 > URL: https://issues.apache.org/jira/browse/SPARK-16398 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Trivial > > Make the {{SparkContext}} {{cancelJob}} and {{cancelStage}} APIs public. This > allows applications to use `SparkListener` to do their own management of jobs > via events, but without using the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16398) Make cancelJob and cancelStage API public
[ https://issues.apache.org/jira/browse/SPARK-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mitesh updated SPARK-16398: --- Affects Version/s: 1.6.2 > Make cancelJob and cancelStage API public > - > > Key: SPARK-16398 > URL: https://issues.apache.org/jira/browse/SPARK-16398 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Mitesh >Priority: Trivial > > Make the `SparkContext` `cancelJob` and `cancelStage` APIs public. This > allows applications to use `SparkListener` to do their own management of jobs > via events, but without using the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16398) Make cancelJob and cancelStage API public
Mitesh created SPARK-16398: -- Summary: Make cancelJob and cancelStage API public Key: SPARK-16398 URL: https://issues.apache.org/jira/browse/SPARK-16398 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Mitesh Priority: Trivial Make the `SparkContext` `cancelJob` and `cancelStage` APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308608#comment-15308608 ] Mitesh commented on SPARK-13979: [~gvernik] you can just `kill` the executor on the command-line, and let the cluster bring up another one. That one will not have the keys. Btw we worked around this by putting the AWS keys in the driver processes' environment variable, but that's not a great workaround if a driver wants to create multiple SparkContext with different keys. > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200183#comment-15200183 ] Mitesh edited comment on SPARK-13979 at 3/17/16 7:09 PM: - I'm seeing this too. Its really annoying because I set the s3 access and secret keys in all places that the docs specify: {noformat} sparkConf.hadoopConf.set("fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.cloneConf", true) /core-site.xml fs.s3n.awsAccessKeyId /spark-env.sh export AWS_ACCESS_KEY_ID = ... {noformat} None of that seems to work. If I kill a running executor, it comes back up and doesnt have the keys anymore. I'm also running standalone cluster. was (Author: masterddt): I'm seeing this too. Its really annoying because I set the s3 access and secret keys in all places that the docs specify: {noformat} sparkConf.hadoopConf.set("fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.cloneConf", true) /core-site.xml fs.s3n.awsAccessKeyId /spark-env.sh export AWS_ACCESS_KEY_ID = ... {noformat} None of that seems to work. If I kill a running executor, it comes back up and doesnt have the keys anymore. > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200183#comment-15200183 ] Mitesh edited comment on SPARK-13979 at 3/17/16 7:10 PM: - I'm seeing this too. Its really annoying because I set the s3 access and secret keys in all places that the docs specify: {noformat} sparkConf.hadoopConf.set("fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.cloneConf", true) /core-site.xml fs.s3n.awsAccessKeyId /spark-env.shexport AWS_ACCESS_KEY_ID = ... {noformat} None of that seems to work. If I kill a running executor, it comes back up and doesnt have the keys anymore. I'm also running standalone cluster. was (Author: masterddt): I'm seeing this too. Its really annoying because I set the s3 access and secret keys in all places that the docs specify: {noformat} sparkConf.hadoopConf.set("fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.cloneConf", true) /core-site.xml fs.s3n.awsAccessKeyId /spark-env.sh export AWS_ACCESS_KEY_ID = ... {noformat} None of that seems to work. If I kill a running executor, it comes back up and doesnt have the keys anymore. I'm also running standalone cluster. > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13979) Killed executor is respawned without AWS keys in standalone spark cluster
[ https://issues.apache.org/jira/browse/SPARK-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200183#comment-15200183 ] Mitesh commented on SPARK-13979: I'm seeing this too. Its really annoying because I set the s3 access and secret keys in all places that the docs specify: {noformat} sparkConf.hadoopConf.set("fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", ..) sparkConf.set("spark.hadoop.cloneConf", true) /core-site.xml fs.s3n.awsAccessKeyId /spark-env.sh export AWS_ACCESS_KEY_ID = ... {noformat} None of that seems to work. If I kill a running executor, it comes back up and doesnt have the keys anymore. > Killed executor is respawned without AWS keys in standalone spark cluster > - > > Key: SPARK-13979 > URL: https://issues.apache.org/jira/browse/SPARK-13979 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.2 > Environment: I'm using Spark 1.5.2 with Hadoop 2.7 and running > experiments on a simple standalone cluster: > 1 master > 2 workers > All ubuntu 14.04 with Java 8/Scala 2.10 >Reporter: Allen George > > I'm having a problem where respawning a failed executor during a job that > reads/writes parquet on S3 causes subsequent tasks to fail because of missing > AWS keys. > h4. Setup: > I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple > standalone cluster: > 1 master > 2 workers > My application is co-located on the master machine, while the two workers are > on two other machines (one worker per machine). All machines are running in > EC2. I've configured my setup so that my application executes its task on two > executors (one executor per worker). > h4. Application: > My application reads and writes parquet files on S3. I set the AWS keys on > the SparkContext by doing: > val sc = new SparkContext() > val hadoopConf = sc.hadoopConfiguration > hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY") > hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET") > At this point I'm done, and I go ahead and use "sc". > h4. Issue: > I can read and write parquet files without a problem with this setup. *BUT* > if an executor dies during a job and is respawned by a worker, tasks fail > with the following error: > "Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret > Access Key must be specified as the username or password (respectively) of a > s3n URL, or by setting the {{fs.s3n.awsAccessKeyId}} or > {{fs.s3n.awsSecretAccessKey}} properties (respectively)." > h4. Basic analysis > I think I've traced this down to the following: > SparkHadoopUtil is initialized with an empty {{SparkConf}}. Later, classes > like {{DataSourceStrategy}} simply call {{SparkHadoopUtil.get.conf}} and > access the (now invalid; missing various properties) {{HadoopConfiguration}} > that's built from this empty {{SparkConf}} object. It's unclear to me why > this is done, and it seems that the code as written would cause broken > results anytime callers use {{SparkHadoopUtil.get.conf}} directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org