Re: Reading Parameter values sent to partition

2016-05-29 Thread Biplob Biswas
Aah, thanks a lot for that insight. Pretty new to the Flink systems and
learning on my own so prone to making mistakes. 

Thanks a lot for helping.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228p7233.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Reading Parameter values sent to partition

2016-05-28 Thread Chesnay Schepler

There are 2 flaws in your code:

Let's start with the fundamental one:

At no point do you associate your mapConf with the flatMap or even the job.

THeoretically you should add it to the flatMap using 
flatMap(...).withConfiguration(mapConf) method.


But here's is the second a more subtle flaw:

the withConfiguration() method does not work at all in the streaming API.

The solution to your problem is to pass your parameter k into the
constructor of MyFlatMap and store it in a field.

Regards,
Chesnay

On 28.05.2016 14:11, Biplob Biswas wrote:

Hi,

I am trying to send some static integer values down to each map function,
using the following code

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);

String filePath = params.getRequired("path");
int k = params.getInt("k");

Configuration mapConf = new Configuration();
mapConf.setInteger("numberofMC", k);


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream tuples = env.addSource(new 
DataStreamGenerator(filePath,
streamSpeed));
 tuples.flatMap(new MyFlatmap())
  }

---
public static final class MyFlatmap extends RichFlatMapFunction>{
int numofMC = 5;
public MyCoFlatmap() {
}

public void open(Configuration parameters) throws Exception {
  super.open(parameters);

  numofMC = parameters.getInteger("numberofMC",-1);
System.out.println(numofMC);

}

@Override
public void flatMap(Point in, 
Collector> out) throws Exception {
}


but when i do the above things to get the value of numberofMC, i dont get it
to the map funcitons and it returns me the default value of -1.

What could be the reason behind this?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.