I have some interesting scenario i am working on pattern matching in flink evaluating the incoming data against a set of patterns using keyedbroadcastprocessfunction, when i am running the program in IDE i am getting null pointer exception in processElements method when trying to access ReadOnlyContext but the same program is running fine in flink terminal, below is my keyedbroadcastprocessfunction
public class TestProcess extends KeyedBroadcastProcessFunction<String, Tuple2<String, sampleSignal>, Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> { public static final MapStateDescriptor <String,Map<String,String>> ruleDescriptor = new MapStateDescriptor <>("RuleDiscriptor", ,BasicTypeInfo.STRING_TYPE_INFO ,new MapTypeInfo<>(String.class,String.class)); @Override public void processElement(Tuple2<String, sampleSignal> value, ReadOnlyContext ctx, Collector<Tuple2<String, sampleSignal>> out) throws Exception { System.out.println("sampleSignal: " +value.f1.toString()); String Context = ctx.getBroadcastState(ruleDescriptor).toString(); Map<String,String> incomingRule = new Hashmap<>(); incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key); /*It's hitting nullpointer exception when printing the size of hashmpa*/ System.out.println("Map Size: " +incomingRule.size()); System.out.println("Context: " +Context); System.out.println("Before Rule Iterator"); /*I tried below way to print the values in broadcaststream just to print the values in broadcast state it don't print anything*/ for(Map.Entry<String, Map<String, String>> rules: ctx.getBroadcastState(ruleDescriptor).immutableEntries()){ System.out.println("Key: " +rules.getKey()); System.out.println("Value: "+rules.getValue()); } for(Map.Entry<String,String> rules: incomingRule.entrySet()){ System.out.println("Key: " +rules.getKey()); System.out.println("Value: "+rules.getValue()); } out.collect(new Tuple2<>(value.f0,value.f1)); } @Override public void processBroadcastElement(Tuple2<String, Map<String, String>> value, Context ctx, Collector<Tuple2<String, sampleSignal>> out) throws Exception { System.out.println("BroadCastState Key: " +value.f0); System.out.println("BroadCastState Value: " +value.f1); ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1); } } Below is the IDE Terminal output with error exception /*Its prints below data in BroadCastState in processBroadcastElement*/ BroadCastState Key: Key BroadCastState Value: {"RuleKey":"RuleValue"} /*Its printing below data in processElement*/ sampleSignal: {SignalData} When it hits the Map in which i am storing the Rule Name and Rule Condition its throwing nullpointer exception and below is the stack trace of error Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at com.westpac.itm.eq.pattern.App.main(App.java:34) Caused by: java.lang.NullPointerException at com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35) at com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238) Caused by: java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Please help me in solving the issue Thanks, Rahul.