Hi,

I have another question. The reducer sorts its inputs before it starts with computation. Which sorting algorithm it is using? In Flink I found QuickSort, HeapSort and etc.

Does the sorting algorithm benefit from pre-sorted partitions. For example, a MergeSort algorithm can sort the partitions of multiple maps together to create a single sorted partition for the reducer. If the map partitions are already sorted, then the MergeSort algorithm can run faster.


Are there any benefits if the map partitions are sorted?


Thank you


BR,

Hilmi

Am 02.08.2016 um 10:01 schrieb Hilmi Yildirim:
Hi Fabian,

thank you very much! This answers my question.


BR,

Hilmi


Am 01.08.2016 um 22:29 schrieb Fabian Hueske:
Hi Hilmi,

the results of the combiner are usually not completely sorted and if they
are this property is not leveraged.
This is due to the following reasons:
1) a sort-combiner only sorts as much data as fits into memory. If there is
more data, the result consists of multiple sorted sequences.
2) since recently, Flink features a hash-based combiner which is usually
more efficient and does not produce sorted output.
3) Flink's pipelined shipping strategy would require that the
receiver merges the result records from all senders on the fly while
receiving data via the network. In case of a straggling sender task all
other senders would be blocked due to backpressure. In addition, this would only work if the combiner does a full sort and not several in-memory sorts.

So, a Reducer will always do a full sort of all received data before
applying the Reduce function (if available, a combiner is applied before
data is written to disk in case of an external sort).

Hope this helps,
Fabian

2016-08-01 18:25 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@dfki.de>:

Hi,

I have a question regarding when data points are sorted when applying a
simple Map Reduce Job.

I have the following code:

data = readFromSource()

data.map(....).groupBy(0).reduce(...)

This code will be translated into the following execution plan:

map -> combiner -> hash partitioning and sorting on 0 -> reduce.


If I am right then the combiner firstly sorts the data, then it applies
the combine function, and then it partitions the result.

Now the partitions are consumed by the reducers. For each mapper/combiner
machine, the reducer has an input gateway. For example, the mappers and
combiners run on 10 machines, then each reducer has 10 input gateways. Now, the reducer consumes the data via a MutableObjectIterator. This iterator firstly consumes data from one input gateway, then from the other and so
on. Is the data of a single input gateway already sorted? Because the
combiner function has sorted the data already. Is the order of the data
points maintained after they are sent through the network?

In my code, the MutableObjectIterator instances are subclasses of
NormalizedKeySorter. Does this mean that the data from an input gateway is
firstly sorted before it is handover to the reduce function? Is this
because the order of the data points is not mainted after sending through
the network?


It would be nice if someone can answer my question. If my assumptions are
wrong, please correct me :)


BR,

Hilmi




--
==================================================================
Hilmi Yildirim, M.Sc.
Researcher

DFKI GmbH
Intelligente Analytik für Massendaten
DFKI Projektbüro Berlin
Alt-Moabit 91c
D-10559 Berlin
Phone: +49 30 23895 1814

E-Mail: hilmi.yildi...@dfki.de

-------------------------------------------------------------
Deutsches Forschungszentrum fuer Kuenstliche Intelligenz GmbH
Firmensitz: Trippstadter Strasse 122, D-67663 Kaiserslautern

Geschaeftsfuehrung:
Prof. Dr. Dr. h.c. mult. Wolfgang Wahlster (Vorsitzender)
Dr. Walter Olthoff

Vorsitzender des Aufsichtsrats:
Prof. Dr. h.c. Hans A. Aukes

Amtsgericht Kaiserslautern, HRB 2313
-------------------------------------------------------------




Reply via email to