[ https://issues.apache.org/jira/browse/SPARK-17497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fridtjof Sander updated SPARK-17497: ------------------------------------ Description: Non-associative aggregations (like ```collect_list```) require the data to be sorted on the grouping key in order to extract aggregation-groups. Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`. Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced. In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above. One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator. While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling. I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated. was: Non-associative aggregations (like `collect_list`) require the data to be sorted on the grouping key in order to extract aggregation-groups. Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`. Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced. In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above. One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator. While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling. I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated. > Preserve order when scanning ordered buckets over multiple partitions > --------------------------------------------------------------------- > > Key: SPARK-17497 > URL: https://issues.apache.org/jira/browse/SPARK-17497 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Fridtjof Sander > Priority: Minor > > Non-associative aggregations (like ```collect_list```) require the data to be > sorted on the grouping key in order to extract aggregation-groups. > Let `table` be a Hive-table, that is partitioned on `p` and bucketed and > sorted on `id`. Let `q` be a query, that executes a non-associative > aggregation on `table.id` over multiple partitions `p`. > Currently, when executing `q`, Spark creates as many RDD-partitions as there > are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the > associated buckets in all requested Hive-partitions. Because the buckets are > read one-by-one, the resulting RDD-partition is no longer sorted on `id` and > has to be explicitly sorted before performing the aggregation. Therefore an > execution-pipeline-block is introduced. > In this Jira I propose to offer an alternative bucket-fetching strategy to > the optimizer, that preserves the internal sorting in a situation described > above. > One way to achieve this, is to open all buckets over all partitions > simultaneously when fetching the data. Since each bucket is internally > sorted, we can perform basically a merge-sort on the collection of > bucket-iterators, and directly emit a sorted RDD-partition, that can be piped > into the next operator. > While there should be no question about the theoretical feasibility of this > idea, there are some obvious implications i.e. with regards to IO-handling. > I would like to investigate the practical feasibility, limits, gains and > drawbacks of this optimization in my masters-thesis and, of course, > contribute the implementation. Before I start, however, I wanted to kindly > ask you, the community, for any thoughts, opinions, corrections or other > kinds of feedback, which is much appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org