Fabian Hueske created FLINK-10100:
-------------------------------------
Summary: Optimizer pushes partitioning past Null-Filter
Key: FLINK-10100
URL: https://issues.apache.org/jira/browse/FLINK-10100
Project: Flink
Issue Type: Bug
Components: DataSet API, Optimizer
Affects Versions: 1.5.2, 1.4.2, 1.3.3, 1.6.0, 1.7.0
Reporter: Fabian Hueske
The DataSet optimizer pushes certain operations like partitioning or sorting
past Filter operators.
It does that because it knows that a {{FilterFunction}} cannot modify the
records but only indicate whether a record should be forwarded or not.
However, this causes problems if the filter should remove records with null
keys. In that case, the partitioning can be pushed past the filter such that
the partitioner has to deal with null keys. This can fail with a
{{NullPointerException}}.
The following code produces an affected plan.
{code}
List<Row> rowList = new ArrayList<>();
rowList.add(Row.of(null, 1L));
rowList.add(Row.of(2L, 2L));
rowList.add(Row.of(2L, 2L));
rowList.add(Row.of(3L, 3L));
rowList.add(Row.of(null, 3L));
DataSet<Row> rows = env.fromCollection(rowList, Types.ROW(Types.LONG,
Types.LONG));
DataSet<Long> result = rows
.filter(r -> r.getField(0) != null)
.setParallelism(4)
.groupBy(0)
.reduceGroup((Iterable<Row> vals, Collector<Long> out) -> {
long cnt = 0L;
for(Row v : vals) { cnt++; }
out.collect(cnt);
}).returns(Types.LONG)
.setParallelism(4);
result.output(new DiscardingOutputFormat());
System.out.println(env.getExecutionPlan());
{code}
To resolve the problem, we could remove the field-forward property of
{{FilterFunction}}. In general, it is typically more efficient to filter before
shipping or sorting data. So this might also improve the performance of certain
plans.
As a *workaround* until this bug is fix, users can implement the filter with a
{{FlatMapFunction}}. {{FlatMapFunction}} is a more generic interface and the
optimizer cannot automatically infer how the function behaves and won't push
partitionings or sorts past the function.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)