I've created FLINK-10100 [1] to track the problem and suggest a solution
and workaround.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10100

2018-08-08 10:39 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Dylan,
>
> Yes, that's a bug.
> As you can see from the plan, the partitioning step is pushed past the
> Filter.
> This is possible, because the optimizer knows that a Filter function
> cannot modify the data (it only removes records).
>
> A workaround should be to implement the filter as a FlatMapFunction. A
> FlatMapFunction can arbitrarily modify a record (even if the return type
> stays the same).
> So the optimizer won't push the partitioning past a FlatMapFunction
> because it does not know whether the function modifies the partitioning key
> or not.
>
> Just FYI, you could annotate the FlatMapFunction and provide information
> about how it modifies the data [1] to enable certain optimizations but
> that's not what we want here.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/batch/#semantic-annotations
>
> 2018-08-08 10:23 GMT+02:00 Chesnay Schepler <ches...@apache.org>:
>
>> I agree, please open a JIRA.
>>
>>
>> On 08.08.2018 05:11, vino yang wrote:
>>
>> Hi Dylan,
>>
>> I roughly looked at your job program and the DAG of the job. It seems
>> that the optimizer chose the wrong optimization execution plan.
>>
>> cc Till.
>>
>> Thanks, vino.
>>
>> Dylan Adams <dylan.ad...@gmail.com> 于2018年8月8日周三 上午2:26写道:
>>
>>> I'm trying to use the Flink DataSet API to validate some records and
>>> have run into an issue. My program uses joins to validate inputs against
>>> reference data. One of the attributes I'm validating is optional, and only
>>> needs to be validated when non-NULL. So I added a filter to prevent the
>>> null-keyed records from being used in the validation join, and was
>>> surprised to receive this exception:
>>>
>>> java.lang.RuntimeException: A NullPointerException occured while
>>> accessing a key field in a POJO. Most likely, the value grouped/joined on
>>> is null. Field name: optionalKey
>>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator.
>>> hash(PojoComparator.java:199)
>>>
>>> It looks like the problem is that Flink has pushed the hash partitioning
>>> aspect of the join before the filter for the null-keyed records and is
>>> trying to hash the null keys. The issue can be seen in the plan
>>> visualization: https://raw.githubusercontent.com/dkadams/fli
>>> nk-plan-issue/master/plan-visualization.png
>>>
>>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
>>> project: https://github.com/dkadams/flink-plan-issue/
>>>
>>> Is this expected behavior or a bug? FLINK-1915 seems to have the same
>>> root problem, but with a negative performance impact instead of a
>>> RuntimeException.
>>>
>>> Regards,
>>> Dylan
>>>
>>
>>
>

Reply via email to