I've got an issue performing joins using Python API in flink-1.1.1. With this example code get an NPE (below). However, the NPE disappears when the filter is removed. Is there an error I'm making in this brief example or is this a Flink bug?
env = get_environment() env.set_parallelism(1) input1 = env.from_elements("1|0","1|2") \ .map(lambda x: x.split("|")) input2 = env.from_elements("1|b") \ .map(lambda x: x.split("|")) \ .filter(lambda x: x[0] != "0") joined = input1 \ .join(input2) \ .where(0) \ .equal_to(0) \ .write_text("output.txt", write_mode=WriteMode.OVERWRITE) env.execute(local=True) ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.NullPointerException at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) at org.apache.flink.api.java.DataSet.join(DataSet.java:742) at org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) at org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) at org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) at org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) at org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) at org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) ... 6 more