I run a code using *Flink* Java API that gets some bytes from *Kafka* and parses it following by inserting into *Cassandra* database using another library *static* method (both parsing and inserting results is done by the library). Running code on local in IDE, I get the desired answer, but running on *YARN* cluster the parse method didn't work as expected!
public class Test { static HashMap<Integer, Object> ConfigHashMap = new HashMap<>(); public static void main(String[] args) throws Exception { CassandraConnection.connect(); Parser.setInsert(true); stream.flatMap(new FlatMapFunction<byte[], Void>() { @Override public void flatMap(byte[] value, Collector<Void> out) throws Exception { Parser.parse(ByteBuffer.wrap(value), ConfigHashMap); // Parser.parse(ByteBuffer.wrap(value)); } }); env.execute(); }} There is a static HashMap field in the classParser that configuration of parsing data is based on its information, and data will insert it during the execution. The problem running on YARN was this data was not available for taskmanagers and they just print config is not available! So I redefine that HashMap as a parameter for the methodparse, but no differences in results! How can I fix the problem?