Hi, Spark Users:
I have a question related to the way I use the spark Dataset API for my case.
If the "ds_old" dataset is having 100 records, with 10 unique $"col1", and for
the following pseudo-code:
val ds_new = ds_old.repartition(5,
$"col1").sortWithinPartitions($"col2").mapPartitions(new MergeFuc)
class MergeFun extends MapPartitionsFunction[InputCaseClass, OutputCaseClass] {
override def call(input: util.Iterator[InputCaseClass]):
util.Iterator[OutputCaseClass] = {}
}
I have some questions related to "partition" defined in the above API, and
below is my understanding:
1) repartition(5, $"col1") means distributing all 100 records based on 10
unique col1 values to 5 partitions. There is no guarantee each of these 5
partitions will have how many/which unique col1 value, but in a well-balanced
hash algorithm, each partition will have close to the average count (10/5 = 2)
for a large unique count of values.
2) sortWithPartitions($"col2) is one of the parts I want to clear out here.
What is exactly the sortWithPartitions meaning here? I want the data sorted by
"col2" within each unique value of "col1" here, but the Spark API uses the
"partition" term so much in this case. I DON'T WANT the 100 records sorted
within each of the 5 partitions, but within each unique of "col1". I believe
this assumption is right, as we use "repartition" with "col1" first. Please
confirm this.
3) mapPartitions(new MergeFuc) is another part I want to clear out. I
originally assumed that my merge function will be called/invoked per unique
col1 value (in this case we have 10 partitions). But after the test, I found
out that indeed it is called ONCE per partition of the 5 partitions. So in this
sense, the partition meaning in this API (mapPartitions) IS DIFFERENT as the
partition meaning defined in "sortWithPartitions", correct? Or my understanding
of "partition" in sortWithPartitions is also WRONG?
In summary, here are my questions:
1) We don't want to use "aggregation" API is due to that in my case, some
unique value of "col1" COULD contain a big number of records, and sorting the
data in a specified order per col1 helps our business case for the merge logic
a lot.
2) We don't want to use "window" function, as the merge logic is indeed an
aggregation logic. There will be only one record output as per grouping (col1).
So even "window" function comes with sorting, but it doesn't fit in this case.
3) The unique value count of "col1" is unpredictable for spark, I understand
that. But I wonder if there is an API that can be used to be called per
grouping (per col1), instead of per partition (as defined as 5 partitions in
this case).
4) If such API doesn't exist, and we have to use MapPartitionsFunction (The
Iterator is much preferred here, as we don't need to worry OOM due to data
skew), my following question is if Spark guarantees that the data comes within
each partition is (col1, col2) order, in the API usage shown above? Or if Spark
will delivery the data of each partition, sorted by "col2" for the first unique
value of col1; then sorted by "col2" for the second unique value of col1, going
forward, etc?
Another challenge is that if our merge function can expect the data in this
order, but have to generate the output per grouping of col1, in an Iterator
format, does Spark have an existing example to refer?
Thanks
Yong