Only the anonymous FlatMapFunction instance is sent to the TaskManager. Move 
the static field to that class. 

Michael

Sent from my iPad

> On Apr 25, 2018, at 10:42 PM, Soheil Pourbafrani <soheil.i...@gmail.com> 
> wrote:
> 
> I run a code using Flink Java API that gets some bytes from Kafka and parses 
> it following by inserting into Cassandra database using another library 
> static method (both parsing and inserting results is done by the library). 
> Running code on local in IDE, I get the desired answer, but running on YARN 
> cluster the parse method didn't work as expected!
> 
> public class Test {
>     static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
> 
>     public static void main(String[] args) throws Exception {
> 
>         CassandraConnection.connect();
>         Parser.setInsert(true);
> 
>         stream.flatMap(new FlatMapFunction<byte[], Void>() {
>             @Override
>             public void flatMap(byte[] value, Collector<Void> out) throws 
> Exception {
>                 Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
>                 // Parser.parse(ByteBuffer.wrap(value));
>             }
>         });
>         env.execute();
>     }
> }
> 
> There is a static HashMap field in the classParser that configuration of 
> parsing data is based on its information, and data will insert it during the 
> execution. The problem running on YARN was this data was not available for 
> taskmanagers and they just print config is not available!
> 
> So I redefine that HashMap as a parameter for the methodparse, but no 
> differences in results!
> 
> How can I fix the problem?
> 
> 

Reply via email to