I run a code using *Flink* Java API that gets some bytes from *Kafka* and
parses it following by inserting into *Cassandra* database using another
library *static* method (both parsing and inserting results is done by the
library). Running code on local in IDE, I get the desired answer, but
running on *YARN* cluster the parse method didn't work as expected!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out)
throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }}


There is a static HashMap field in the classParser that configuration of
parsing data is based on its information, and data will insert it during
the execution. The problem running on YARN was this data was not available
for taskmanagers and they just print config is not available!

So I redefine that HashMap as a parameter for the methodparse, but no
differences in results!

How can I fix the problem?

Reply via email to