There are 2 flaws in your code:

Let's start with the fundamental one:

At no point do you associate your mapConf with the flatMap or even the job.

THeoretically you should add it to the flatMap using flatMap(...).withConfiguration(mapConf) method.

But here's is the second a more subtle flaw:

the withConfiguration() method does not work at all in the streaming API.

The solution to your problem is to pass your parameter k into the
constructor of MyFlatMap and store it in a field.

Regards,
Chesnay

On 28.05.2016 14:11, Biplob Biswas wrote:
Hi,

I am trying to send some static integer values down to each map function,
using the following code

        public static void main(String[] args) throws Exception {
                
                ParameterTool params = ParameterTool.fromArgs(args);
                
                String filePath = params.getRequired("path");
                int k = params.getInt("k");
                
                Configuration mapConf = new Configuration();
                mapConf.setInteger("numberofMC", k);

                
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                
                DataStream<Point> tuples = env.addSource(new 
DataStreamGenerator(filePath,
streamSpeed));
                 tuples.flatMap(new MyFlatmap())
          }

-------------------------------------------------------------------------------------------------------
public static final class MyFlatmap extends RichFlatMapFunction<Point,
                                                                                          
                                                      
Tuple2&lt;MicroCluster[],Integer>>{
                int numofMC = 5;
                public MyCoFlatmap() {
                }
                
                public void open(Configuration parameters) throws Exception {
                      super.open(parameters);

                      numofMC = parameters.getInteger("numberofMC",-1);
                                System.out.println(numofMC);

                    }
                
                @Override
                public void flatMap(Point in, 
Collector<Tuple2&lt;MicroCluster[],
Integer>> out) throws Exception {
                }


but when i do the above things to get the value of numberofMC, i dont get it
to the map funcitons and it returns me the default value of -1.

What could be the reason behind this?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Reply via email to