The question is how to encapsulate numerous transformations into one object or 
may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
  
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as inDataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }});
   - Now about ClassPI      
      - Constructor instantiates ExecutionEnvironment, which is local for this 
class, as inpublic classPI(){ this.NumIter=1000000; env = 
ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in 
the class classPI.   
   - Has method compute() that runs all data transormations (in this example it 
is just several lines but potentially it might contain tons of Flink 
transfromations)public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations?Thanks a lot!
-------------------------The code ----------------------
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                               
            
         return  PI;}}}

Reply via email to