Re: Does Flink allows for encapsulation of transformations?

2016-06-09 Thread Ser Kho

Chesnay: I have two simple questions, related to the previous ones about 
encapsulation of transformations. 
Question 1. I have tried to extend my code using your suggestions and come up 
with a small concern. First, your code:
public static void main(String[] args) throws Exception 
{
   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   DataSet pi = new classPI(env).compute();
   new classThatNeedsPI(env).computeWhatever(pi); //append your transformations 
to pi
env.execute();
 }


Below is my code (the bold lines are very similar and work ok). The line of 
concern is marked by blue color. The issue is that I do not use env in the 
constructor of the class classLengthCircle(), instead I use  DataSet pi in the 
method  computeLengthCircle(pi, Radius)and also DataSet Radius, but the latter 
does not matter for the question. Then, I proceed with transformations using 
this DataSet pi, see the  class classLengthCircle below. It seems that the 
logic of this class and its method computeLengthCircle() does not require env 
at all. My question is if this  code work will on a cluster (it does work on a 
local computer)?
    final ExecutionEnvironment env =  
ExecutionEnvironment.getExecutionEnvironment();               DataSet 
Radius = env.fromElements(10.0);            DataSet    NumIter 
=env.fromElements(100L);              // this line is similar to the 
suggested           DataSet pi = new classPI(env).compute(NumIter);  // 
this line is somewhat different from the suggested, as it has no env in the 
constructor           DataSet LengthCircle = new 
classLengthCircle().computeLengthCircle(pi, Radius); =  
public static final class classLengthCircle    {        public  DataSet 
computeLengthCircle(DataSet pi, DataSet Radius)        {       
DataSet result = pi.cross(Radius).map(       new 
MapFunction, Double >() { @Override     public Double 
map(Tuple2 arg0) throws Exception {     return 2*arg0.f0 
*arg0.f1;     }}         ); return result;          }         } 
Question 2:
I tried to enter a parameter DataSet NumIter into a class  MapFunction of 
transformation map(), see the blue mark in the code below. It seems this 
parameter appears in the MapFunction without explicit passing, since nowhere 
the line .map(new MapFunction() has any mentioning of NumIter.Is 
the suggested approach a right way to pass a parameter inside the 
transformation MapFunction ?Note, that the code works all right on a single 
computer.
public static final class classPI implements Serializable
   {  private final ExecutionEnvironment env;  public 
classPI(ExecutionEnvironment env) {this.env = env;} public  DataSet  
compute( final  DataSet NumIter) throws Exception{  return  
this.env.generateSequence(1, NumIter.collect().get(0)) .map(new Sampler()) 
.reduce(new SumReducer()) .map(new MapFunction()   { Long N = 
NumIter.collect().get(0);  @Override public Double map(Long arg0) throws 
Exception { return arg0 *4.0/N; }}); }}

Thanks a lot for your time.Ser



On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler  
wrote:
 

   1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
cluster.
you would (most likely) run the the sub-job (calculating pi) only on a single 
node.
 
1b. different execution environments generally imply different flink programs.
 
2. sure it does, since it's a normal flink job. yours on the other hand 
doesn't, since the job calculating PI only runs on a single TaskManager.
 
3. there are 2 ways. you can either chain jobs like this: (effectively running 
2 flink programs in succession)
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: "
 + pi);
  new classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
env.fromElements call and proceeds from there
 } or (if all building blocks are flink programs) build a single job:
 public static void main(String[] args) throws Exception 
{
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
DataSet pi = new classPI(env).compute();
new classThatNeedsPI(env).computeWhatever(pi); //append your 
transformations to pi
env.execute();
 }

...
public DataSet compute() throws Exception {
return this.env.generateSequence(1, NumIter)
.map(new Sampler())
.reduce(new SumReducer())
.map(/*return 4 * x*/);}
...

public ? computeWhatever(DataSet pi) throws Exception {
...
}
 
On 07.06.2016 13:35, Ser Kho wrote:
  
  Chesnay: 
  1a. The code actually works, that is the point.  1b. What restrict for a 
Flink program to have several execution environments? 2. I am not sure that 
your modification allows for parallelism. Does it? 3. This code is a simple 
example of writing/organizing large and complicated programs, 

Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Ser Kho

Chesnay:Just want to thank you. I might have one or two related questions later 
on, but now just thanks.

 

On Tuesday, June 7, 2016 8:18 AM, Greg Hogan  wrote:
 

 "The question is how to encapsulate numerous transformations into one object 
or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by 
calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho  wrote:

The question is how to encapsulate numerous transformations into one object or 
may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
  
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as inDataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }});
   - Now about ClassPI  
  - Constructor instantiates ExecutionEnvironment, which is local for this 
class, as inpublic classPI(){ this.NumIter=100; env = 
ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in 
the class classPI.   
   - Has method compute() that runs all data transormations (in this example it 
is just several lines but potentially it might contain tons of Flink 
transfromations)public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations?Thanks a lot!
-The code --
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet PI = opi.map(new MapFunction() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
   this.NumIter=100;
env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
 DataSet count = env.generateSequence(1, NumIter)
   .map(new Sampler())
   .reduce(new SumReducer());
 PI = 4.0*count.collect().get(0)/NumIter;   

 return  PI;}}}



  

Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Ser Kho
Chesnay: 
1a. The code actually works, that is the point. 1b. What restrict for a Flink 
program to have several execution environments?2. I am not sure that your 
modification allows for parallelism. Does it?3. This code is a simple example 
of writing/organizing large and complicated programs, where the result of this 
pi needed to be used in another DataSet transformations beyond classPi(). What 
to do in this case?Thanks a lot for the suggestions. 

On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler  
wrote:
 

  from what i can tell from your code you are trying to execute a job within a 
job. This just doesn't work.
 
 your main method should look like this:
 
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: " + pi);   
} 
 
 
 On 06.06.2016 21:14, Ser Kho wrote:
  
  The question is how to encapsulate numerous transformations into one object 
or may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on  one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
   
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   -  Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as in DataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }}); 
   -  Now about ClassPI   
  -  Constructor instantiates ExecutionEnvironment, which is local for this 
class, as in public classPI(){ this.NumIter=100; env = 
ExecutionEnvironment.getExecutionEnvironment();} 
 
 Thus, the code has two ExecutionEnvironment objects: one in main and another 
in the class classPI.
   -  Has method compute() that runs all data transormations (in this example 
it is just several lines but potentially it might contain tons of Flink 
transfromations) public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
 return PI;} 
 the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations? Thanks a lot! 
  -The code -- 
  public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet PI = opi.map(new MapFunction() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
   this.NumIter=100;
env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
 DataSet count = env.generateSequence(1, NumIter
)
   .map(new Sampler())
   .reduce(new SumReducer());
 PI = 4.0*count.collect().get(0)/NumIter;  
 
 return  PI;}}}  
 
 

  

Does Flink allows for encapsulation of transformations?

2016-06-06 Thread Ser Kho
The question is how to encapsulate numerous transformations into one object or 
may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
  
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as inDataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }});
   - Now about ClassPI  
  - Constructor instantiates ExecutionEnvironment, which is local for this 
class, as inpublic classPI(){ this.NumIter=100; env = 
ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in 
the class classPI.   
   - Has method compute() that runs all data transormations (in this example it 
is just several lines but potentially it might contain tons of Flink 
transfromations)public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations?Thanks a lot!
-The code --
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet PI = opi.map(new MapFunction() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
   this.NumIter=100;
env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
 DataSet count = env.generateSequence(1, NumIter)
   .map(new Sampler())
   .reduce(new SumReducer());
 PI = 4.0*count.collect().get(0)/NumIter;   

 return  PI;}}}