[ 
https://issues.apache.org/jira/browse/SPARK-17497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fridtjof Sander updated SPARK-17497:
------------------------------------
    Description: 
Non-associative aggregations (like ```collect_list```) require the data to be 
sorted on the grouping key in order to extract aggregation-groups.

Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted 
on `id`. Let `q` be a query, that executes a non-associative aggregation on 
`table.id` over multiple partitions `p`.

Currently, when executing `q`, Spark creates as many RDD-partitions as there 
are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the 
associated buckets in all requested Hive-partitions. Because the buckets are 
read one-by-one, the resulting RDD-partition is no longer sorted on `id` and 
has to be explicitly sorted before performing the aggregation. Therefore an 
execution-pipeline-block is introduced.

In this Jira I propose to offer an alternative bucket-fetching strategy to the 
optimizer, that preserves the internal sorting in a situation described above.

One way to achieve this, is to open all buckets over all partitions 
simultaneously when fetching the data. Since each bucket is internally sorted, 
we can perform basically a merge-sort on the collection of bucket-iterators, 
and directly emit a sorted RDD-partition, that can be piped into the next 
operator.

While there should be no question about the theoretical feasibility of this 
idea, there are some obvious implications i.e. with regards to IO-handling.

I would like to investigate the practical feasibility, limits, gains and 
drawbacks of this optimization in my masters-thesis and, of course, contribute 
the implementation. Before I start, however, I wanted to kindly ask you, the 
community, for any thoughts, opinions, corrections or other kinds of feedback, 
which is much appreciated.

  was:
Non-associative aggregations (like `collect_list`) require the data to be 
sorted on the grouping key in order to extract aggregation-groups.

Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted 
on `id`. Let `q` be a query, that executes a non-associative aggregation on 
`table.id` over multiple partitions `p`.

Currently, when executing `q`, Spark creates as many RDD-partitions as there 
are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the 
associated buckets in all requested Hive-partitions. Because the buckets are 
read one-by-one, the resulting RDD-partition is no longer sorted on `id` and 
has to be explicitly sorted before performing the aggregation. Therefore an 
execution-pipeline-block is introduced.

In this Jira I propose to offer an alternative bucket-fetching strategy to the 
optimizer, that preserves the internal sorting in a situation described above.

One way to achieve this, is to open all buckets over all partitions 
simultaneously when fetching the data. Since each bucket is internally sorted, 
we can perform basically a merge-sort on the collection of bucket-iterators, 
and directly emit a sorted RDD-partition, that can be piped into the next 
operator.

While there should be no question about the theoretical feasibility of this 
idea, there are some obvious implications i.e. with regards to IO-handling.

I would like to investigate the practical feasibility, limits, gains and 
drawbacks of this optimization in my masters-thesis and, of course, contribute 
the implementation. Before I start, however, I wanted to kindly ask you, the 
community, for any thoughts, opinions, corrections or other kinds of feedback, 
which is much appreciated.


> Preserve order when scanning ordered buckets over multiple partitions
> ---------------------------------------------------------------------
>
>                 Key: SPARK-17497
>                 URL: https://issues.apache.org/jira/browse/SPARK-17497
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Fridtjof Sander
>            Priority: Minor
>
> Non-associative aggregations (like ```collect_list```) require the data to be 
> sorted on the grouping key in order to extract aggregation-groups.
> Let `table` be a Hive-table, that is partitioned on `p` and bucketed and 
> sorted on `id`. Let `q` be a query, that executes a non-associative 
> aggregation on `table.id` over multiple partitions `p`.
> Currently, when executing `q`, Spark creates as many RDD-partitions as there 
> are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the 
> associated buckets in all requested Hive-partitions. Because the buckets are 
> read one-by-one, the resulting RDD-partition is no longer sorted on `id` and 
> has to be explicitly sorted before performing the aggregation. Therefore an 
> execution-pipeline-block is introduced.
> In this Jira I propose to offer an alternative bucket-fetching strategy to 
> the optimizer, that preserves the internal sorting in a situation described 
> above.
> One way to achieve this, is to open all buckets over all partitions 
> simultaneously when fetching the data. Since each bucket is internally 
> sorted, we can perform basically a merge-sort on the collection of 
> bucket-iterators, and directly emit a sorted RDD-partition, that can be piped 
> into the next operator.
> While there should be no question about the theoretical feasibility of this 
> idea, there are some obvious implications i.e. with regards to IO-handling.
> I would like to investigate the practical feasibility, limits, gains and 
> drawbacks of this optimization in my masters-thesis and, of course, 
> contribute the implementation. Before I start, however, I wanted to kindly 
> ask you, the community, for any thoughts, opinions, corrections or other 
> kinds of feedback, which is much appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to