I succeeded to do some experimental evaluation, and it seems I correctly understood the code: A partition that consist of hive-buckets is read bucket-file by bucket-file, which leads to the loss of internal sorting.

Does anyone have an opinion about my alternative idea of reading from multiple bucket-files simultaneously to keep that ordering?

Regarding the followup questions:

1. I found the `collect_list()`function, which seems provide what I want. However, I fail to collect more than one column. Is there a way to do basically: .agg(collect_list("*")) ?

2. I worked around that problem by writing and reading the table within the same context/session, so that the ephemeral metastore doesn't loose it's content. However, in general a hive-metastore seems to be required for a production usage, since there is only an ephemeral- and a hive-catalog implementation available in 2.0.0.

I would highly appreciate some feedback to my thoughts and questions

Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:
Hi Spark users,

I'm currently investigating spark's bucketing and partitioning capabilities and I have some questions:

Let /T/ be a table that is bucketed and sorted by /T.id/ and partitioned by /T.date/. Before persisting, /T/ has been repartitioned by /T.id/ to get only one file per bucket.
I want to group by /T.id/ over a subset of /T.date/'s values.

It seems to me that the best execution plan in this scenario would be the following: - Schedule one stage (no exchange) with as many tasks as we have bucket-ids, so that there is a mapping from each task to a bucket-id - Each tasks opens all bucket-files belonging to "it's" bucket-id simultaneously, which is one per affected partition /T.date/ - Since the data inside the buckets are sorted, we can perform the second phase of "two-phase-multiway-merge-sort" to get our groups, which can be "pipelined" into the next operator

From what I understand after scanning through the code, however, it appears to me that each bucket-file is read completely before the record-iterator is advanced to the next bucket file (see FileScanRDD , same applies to Hive). So a groupBy would require to sort the partitions of the resulting RDD before the groups can be emitted, which results in a blocking operation.

Could anyone confirm that I'm assessing the situation correctly here, or correct me if not?

Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the RDD groupBy would return them? I fail to formulate a job like this, because a query with groupBy, that misses an aggregation function, is invalid. 2. I haven't simply testet this, because I fail to load a table with the specified properties like above:
After writing a table like this:
.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");
I fail to read it again, with the partitioning and bucketing being recognized. Is a functioning Hive-Metastore required for this to work, or is there a workaround?

I hope someone can spare the time to help me out here.

All the best,
Fridtjof


Reply via email to