I agree, please open a JIRA.

On 08.08.2018 05:11, vino yang wrote:
Hi Dylan,

I roughly looked at your job program and the DAG of the job. It seems that the optimizer chose the wrong optimization execution plan.

cc Till.

Thanks, vino.

Dylan Adams <[email protected] <mailto:[email protected]>> 于2018年8月8日周三 上午2:26写道:

    I'm trying to use the Flink DataSet API to validate some records
    and have run into an issue. My program uses joins to validate
    inputs against reference data. One of the attributes I'm
    validating is optional, and only needs to be validated when
    non-NULL. So I added a filter to prevent the null-keyed records
    from being used in the validation join, and was surprised to
    receive this exception:

    java.lang.RuntimeException: A NullPointerException occured while
    accessing a key field in a POJO. Most likely, the value
    grouped/joined on is null. Field name: optionalKey
    at
    
org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)

    It looks like the problem is that Flink has pushed the hash
    partitioning aspect of the join before the filter for the
    null-keyed records and is trying to hash the null keys. The issue
    can be seen in the plan visualization:
    
https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png

    I was able to reproduce the problem in v1.4.2 and 1.5.2, with this
    small project: https://github.com/dkadams/flink-plan-issue/

    Is this expected behavior or a bug? FLINK-1915 seems to have the
    same root problem, but with a negative performance impact instead
    of a RuntimeException.

    Regards,
    Dylan


Reply via email to