Luke Hutchison created FLINK-6110:
-------------------------------------
Summary: Flink unnecessarily repeats shared work triggered by
different blocking sinks, leading to massive inefficiency
Key: FLINK-6110
URL: https://issues.apache.org/jira/browse/FLINK-6110
Project: Flink
Issue Type: Bug
Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison
After a blocking sink (collect() or count()) is called, all already-computed
intermediate DataSets are thrown away, and any subsequent code that tries to
make use of an already-computed DataSet will require the DataSet to be computed
from scratch. For example, the following code prints the elements a, b and c
twice in succession, even though the DataSet ds should only have to be computed
once:
{code}
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> {
System.out.println(s); return s + s;});
List<String> c1 = ds.map(s -> s).collect();
List<String> c2 = ds.map(s -> s).collect();
{code}
This is problematic because not every operation is easy to express in Flink
using joins and filters -- sometimes for smaller datasets (such as marginal
counts) it's easier to collect the values into a HashMap, and then pass that
HashMap into subsequent operations so they can look up the values they need
directly. A more complex example is the need to sort a set of values, then use
the sorted array for subsequent binary search operations to identify rank --
this is a lot easier to do using an array of sorted values, as long as that
array fits easily in RAM.
However, any collect() or count() operation causes immediate execution of the
Flink pipeline, which throws away *all* intermediate values that could be
reused for future executions. As a result, code can be extremely inefficient,
recomputing the same values over and over again unnecessarily.
I believe that intermediate values should not be released or garbage collected
until after env.execute() is called.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)