Re: Grouping on bucketed and sorted columns

2016-09-02 Thread Fridtjof Sander
I succeeded to do some experimental evaluation, and it seems I correctly 
understood the code:
A partition that consist of hive-buckets is read bucket-file by 
bucket-file, which leads to the loss of internal sorting.


Does anyone have an opinion about my alternative idea of reading from 
multiple bucket-files simultaneously to keep that ordering?


Regarding the followup questions:

1. I found the `collect_list()`function, which seems provide what I 
want. However, I fail to collect more than one column. Is there a way to 
do basically: .agg(collect_list("*")) ?


2. I worked around that problem by writing and reading the table within 
the same context/session, so that the ephemeral metastore doesn't loose 
it's content. However, in general a hive-metastore seems to be required 
for a production usage, since there is only an ephemeral- and a 
hive-catalog implementation available in 2.0.0.


I would highly appreciate some feedback to my thoughts and questions

Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:

Hi Spark users,

I'm currently investigating spark's bucketing and partitioning 
capabilities and I have some questions:


Let /T/ be a table that is bucketed and sorted by /T.id/ and 
partitioned by /T.date/. Before persisting, /T/ has been repartitioned 
by /T.id/ to get only one file per bucket.

I want to group by /T.id/ over a subset of /T.date/'s values.

It seems to me that the best execution plan in this scenario would be 
the following:
- Schedule one stage (no exchange) with as many tasks as we have 
bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id 
simultaneously, which is one per affected partition /T.date/
- Since the data inside the buckets are sorted, we can perform the 
second phase of "two-phase-multiway-merge-sort" to get our groups, 
which can be "pipelined" into the next operator


From what I understand after scanning through the code, however, it 
appears to me that each bucket-file is read completely before the 
record-iterator is advanced to the next bucket file (see FileScanRDD , 
same applies to Hive). So a groupBy would require to sort the 
partitions of the resulting RDD before the groups can be emitted, 
which results in a blocking operation.


Could anyone confirm that I'm assessing the situation correctly here, 
or correct me if not?


Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the 
RDD groupBy would return them? I fail to formulate a job like this, 
because a query with groupBy, that misses an aggregation function, is 
invalid.
2. I haven't simply testet this, because I fail to load a table with 
the specified properties like above:

After writing a table like this:
.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");
I fail to read it again, with the partitioning and bucketing being 
recognized.
Is a functioning Hive-Metastore required for this to work, or is there 
a workaround?


I hope someone can spare the time to help me out here.

All the best,
Fridtjof





Grouping on bucketed and sorted columns

2016-08-31 Thread Fridtjof Sander

Hi Spark users,

I'm currently investigating spark's bucketing and partitioning 
capabilities and I have some questions:


Let /T/ be a table that is bucketed and sorted by /T.id/ and partitioned 
by /T.date/. Before persisting, /T/ has been repartitioned by /T.id/ to 
get only one file per bucket.

I want to group by /T.id/ over a subset of /T.date/'s values.

It seems to me that the best execution plan in this scenario would be 
the following:
- Schedule one stage (no exchange) with as many tasks as we have 
bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id 
simultaneously, which is one per affected partition /T.date/
- Since the data inside the buckets are sorted, we can perform the 
second phase of "two-phase-multiway-merge-sort" to get our groups, which 
can be "pipelined" into the next operator


From what I understand after scanning through the code, however, it 
appears to me that each bucket-file is read completely before the 
record-iterator is advanced to the next bucket file (see FileScanRDD , 
same applies to Hive). So a groupBy would require to sort the partitions 
of the resulting RDD before the groups can be emitted, which results in 
a blocking operation.


Could anyone confirm that I'm assessing the situation correctly here, or 
correct me if not?


Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the RDD 
groupBy would return them? I fail to formulate a job like this, because 
a query with groupBy, that misses an aggregation function, is invalid.
2. I haven't simply testet this, because I fail to load a table with the 
specified properties like above:

After writing a table like this:

.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");

I fail to read it again, with the partitioning and bucketing being 
recognized.
Is a functioning Hive-Metastore required for this to work, or is there a 
workaround?


I hope someone can spare the time to help me out here.

All the best,
Fridtjof