I succeeded to do some experimental evaluation, and it seems I correctly
understood the code:
A partition that consist of hive-buckets is read bucket-file by
bucket-file, which leads to the loss of internal sorting.
Does anyone have an opinion about my alternative idea of reading from
multiple bucket-files simultaneously to keep that ordering?
Regarding the followup questions:
1. I found the `collect_list()`function, which seems provide what I
want. However, I fail to collect more than one column. Is there a way to
do basically: .agg(collect_list("*")) ?
2. I worked around that problem by writing and reading the table within
the same context/session, so that the ephemeral metastore doesn't loose
it's content. However, in general a hive-metastore seems to be required
for a production usage, since there is only an ephemeral- and a
hive-catalog implementation available in 2.0.0.
I would highly appreciate some feedback to my thoughts and questions
Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:
Hi Spark users,
I'm currently investigating spark's bucketing and partitioning
capabilities and I have some questions:
Let /T/ be a table that is bucketed and sorted by /T.id/ and
partitioned by /T.date/. Before persisting, /T/ has been repartitioned
by /T.id/ to get only one file per bucket.
I want to group by /T.id/ over a subset of /T.date/'s values.
It seems to me that the best execution plan in this scenario would be
the following:
- Schedule one stage (no exchange) with as many tasks as we have
bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id
simultaneously, which is one per affected partition /T.date/
- Since the data inside the buckets are sorted, we can perform the
second phase of "two-phase-multiway-merge-sort" to get our groups,
which can be "pipelined" into the next operator
From what I understand after scanning through the code, however, it
appears to me that each bucket-file is read completely before the
record-iterator is advanced to the next bucket file (see FileScanRDD ,
same applies to Hive). So a groupBy would require to sort the
partitions of the resulting RDD before the groups can be emitted,
which results in a blocking operation.
Could anyone confirm that I'm assessing the situation correctly here,
or correct me if not?
Followup questions:
1. Is there a way to get the "sql" groups into the RDD API, like the
RDD groupBy would return them? I fail to formulate a job like this,
because a query with groupBy, that misses an aggregation function, is
invalid.
2. I haven't simply testet this, because I fail to load a table with
the specified properties like above:
After writing a table like this:
.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");
I fail to read it again, with the partitioning and bucketing being
recognized.
Is a functioning Hive-Metastore required for this to work, or is there
a workaround?
I hope someone can spare the time to help me out here.
All the best,
Fridtjof