I have some interesting scenario i am working on pattern matching in flink
evaluating the incoming data against a set of patterns using
keyedbroadcastprocessfunction, when i am running the program in IDE i am
getting null pointer exception in processElements method when trying to
access ReadOnlyContext but the same program is running fine in flink
terminal, below is my keyedbroadcastprocessfunction

public class TestProcess extends KeyedBroadcastProcessFunction<String,
Tuple2<String, sampleSignal>,
        Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {

    public static final MapStateDescriptor <String,Map<String,String>>
ruleDescriptor =
            new MapStateDescriptor <>("RuleDiscriptor",
                    ,BasicTypeInfo.STRING_TYPE_INFO
                    ,new MapTypeInfo<>(String.class,String.class));

    @Override
    public void processElement(Tuple2<String, sampleSignal> value,
ReadOnlyContext ctx, Collector<Tuple2<String,
            sampleSignal>> out) throws Exception {

        System.out.println("sampleSignal: " +value.f1.toString());

        String Context = ctx.getBroadcastState(ruleDescriptor).toString();

        Map<String,String> incomingRule = new Hashmap<>();

        incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key);

        /*It's hitting nullpointer exception when printing the size of
hashmpa*/
        System.out.println("Map Size: " +incomingRule.size());

        System.out.println("Context: " +Context);

        System.out.println("Before Rule Iterator");

        /*I tried below way to print the values in broadcaststream just to
print the values
          in broadcast state it don't print anything*/
        for(Map.Entry<String, Map<String, String>> rules:
                ctx.getBroadcastState(ruleDescriptor).immutableEntries()){
            System.out.println("Key: " +rules.getKey());
            System.out.println("Value: "+rules.getValue());
        }


        for(Map.Entry<String,String> rules: incomingRule.entrySet()){

            System.out.println("Key: " +rules.getKey());
            System.out.println("Value: "+rules.getValue());
        }

        out.collect(new Tuple2<>(value.f0,value.f1));

    }

    @Override
    public void processBroadcastElement(Tuple2<String, Map<String, String>>
value, Context ctx,
                                        Collector<Tuple2<String,
sampleSignal>> out) throws Exception {

        System.out.println("BroadCastState Key: " +value.f0);
        System.out.println("BroadCastState Value: " +value.f1);
        ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);

    }
}
Below is the IDE Terminal output with error exception

/*Its prints below data in BroadCastState in processBroadcastElement*/
BroadCastState Key: Key
BroadCastState Value: {"RuleKey":"RuleValue"}


/*Its printing below data in processElement*/

sampleSignal: {SignalData}

When it hits the Map in which i am storing the Rule Name and Rule Condition
its throwing nullpointer exception and below is the stack trace of error

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
    at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at com.westpac.itm.eq.pattern.App.main(App.java:34)
Caused by: java.lang.NullPointerException
    at
com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35)
    at
com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15)
    at
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
Caused by: java.lang.NullPointerException

    at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)


Please help me in solving the issue

Thanks,
Rahul.

Reply via email to