The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows: - Create a class, named classPI, which method compute() does all data transformations, see more about it below. - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI()); - Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as inDataSet< Double > PI = MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }}); - Now about ClassPI - Constructor instantiates ExecutionEnvironment, which is local for this class, as inpublic classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}
Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI. - Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations)public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter; return PI;} the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?Thanks a lot! -------------------------The code ---------------------- public class PiEstimation{ public static void main(String[] args) throws Exception { // this is one ExecutionEnvironment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // this is critical DataSet with my classPI that computes PI DataSet<classPI> opi = env.fromElements(new classPI()); // this map calls the method compute() of class classPI that computes PI DataSet<Double> PI = MapFunction<classPI , Double>() { public Double map(classPI objPI) throws Exception { // this is how I call method compute() that calculates PI using transformations return objPI.compute(); } }); double pi = PI.collect().get(0); System.out.println("We estimate Pi to be: " + pi); } // this class is of no impotance for my question, howerver, it is relevant for pi calculation public static class Sampler implements MapFunction<Long, Long> { @Override public Long map(Long value) { double x = Math.random(); double y = Math.random(); return (x * x + y * y) < 1 ? 1L : 0L;}} // this class is of no impotance for my question, howerver, it is relevant for pi calculation public static final class SumReducer implements ReduceFunction<Long>{ @Override public Long reduce(Long value1, Long value2) { return value1 + value2;}} // this is my class that computes PI, my question is whether such a class is valid in Flink on cluster with parallel computation public static final class classPI { public Integer NumIter; private final ExecutionEnvironment env; public Double PI; // this is constructor with another ExecutionEnvironment public classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment(); } //This is the the method that contains all data transformation public Double compute() throws Exception{ DataSet<Long> count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter; return PI;}}}