I've got an issue performing joins using Python API in flink-1.1.1. With
this example code get an NPE (below). However, the NPE disappears when the
filter is removed. Is there an error I'm making in this brief example or is
this a Flink bug?



    env = get_environment()
    env.set_parallelism(1)

    input1 = env.from_elements("1|0","1|2") \
        .map(lambda x: x.split("|"))

    input2 = env.from_elements("1|b") \
        .map(lambda x: x.split("|")) \
        .filter(lambda x: x[0] != "0")


    joined = input1 \
        .join(input2) \
        .where(0) \
        .equal_to(0) \
        .write_text("output.txt", write_mode=WriteMode.OVERWRITE)

    env.execute(local=True)





------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.NullPointerException
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
        at
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
        at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
        at
org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
        at
org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
        at
org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
        at
org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
        at
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
        at
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        ... 6 more

Reply via email to