I've created FLINK-10100 [1] to track the problem and suggest a solution and workaround.
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-10100 2018-08-08 10:39 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Dylan, > > Yes, that's a bug. > As you can see from the plan, the partitioning step is pushed past the > Filter. > This is possible, because the optimizer knows that a Filter function > cannot modify the data (it only removes records). > > A workaround should be to implement the filter as a FlatMapFunction. A > FlatMapFunction can arbitrarily modify a record (even if the return type > stays the same). > So the optimizer won't push the partitioning past a FlatMapFunction > because it does not know whether the function modifies the partitioning key > or not. > > Just FYI, you could annotate the FlatMapFunction and provide information > about how it modifies the data [1] to enable certain optimizations but > that's not what we want here. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.5/dev/batch/#semantic-annotations > > 2018-08-08 10:23 GMT+02:00 Chesnay Schepler <ches...@apache.org>: > >> I agree, please open a JIRA. >> >> >> On 08.08.2018 05:11, vino yang wrote: >> >> Hi Dylan, >> >> I roughly looked at your job program and the DAG of the job. It seems >> that the optimizer chose the wrong optimization execution plan. >> >> cc Till. >> >> Thanks, vino. >> >> Dylan Adams <dylan.ad...@gmail.com> 于2018年8月8日周三 上午2:26写道: >> >>> I'm trying to use the Flink DataSet API to validate some records and >>> have run into an issue. My program uses joins to validate inputs against >>> reference data. One of the attributes I'm validating is optional, and only >>> needs to be validated when non-NULL. So I added a filter to prevent the >>> null-keyed records from being used in the validation join, and was >>> surprised to receive this exception: >>> >>> java.lang.RuntimeException: A NullPointerException occured while >>> accessing a key field in a POJO. Most likely, the value grouped/joined on >>> is null. Field name: optionalKey >>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator. >>> hash(PojoComparator.java:199) >>> >>> It looks like the problem is that Flink has pushed the hash partitioning >>> aspect of the join before the filter for the null-keyed records and is >>> trying to hash the null keys. The issue can be seen in the plan >>> visualization: https://raw.githubusercontent.com/dkadams/fli >>> nk-plan-issue/master/plan-visualization.png >>> >>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small >>> project: https://github.com/dkadams/flink-plan-issue/ >>> >>> Is this expected behavior or a bug? FLINK-1915 seems to have the same >>> root problem, but with a negative performance impact instead of a >>> RuntimeException. >>> >>> Regards, >>> Dylan >>> >> >> >