Hi Lydia,

To build iterative algorithm on Flink, using API for iterations [1] would be 
better than using for-loop. Your program triggers multiple executions by 
multiple calling `next.gap.print()`. In each execution, Flink reads whole data 
redundantly and it cause performance to decrease.

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html

> On Mar 27, 2016, at 7:16 AM, Lydia Ickler <ickle...@googlemail.com> wrote:
> 
> Hi,
> 
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I 
> cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
> 
> Thanks in advance!
> Lydia
> 
> package org.apache.flink.contrib.lifescience.examples;
> 
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
> 
> import java.util.*;
> 
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
> 
> public class PowerIteration {
> 
>     //path to input
>     static String input = null;
>     //path to output
>     static String output = null;
>     //number of iterations (default = 7)
>     static int iterations = 7;
>     //threshold
>     static double delta = 0.01;
> 
>     public void run() throws Exception {
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> 
>         //read input file
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, 
> input);
> 
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
> 
>         //initial:
>         //Approximate EigenVector by PowerIteration
>         eigenVector = PowerIteration_getEigenVector(matrixA);
>         //Approximate EigenValue by PowerIteration
>         eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
>         //Deflate original matrix
>         matrixA = 
> PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
> 
>         MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
> 
>         MyResult next = null;
> 
>         //PROBLEM!!! get i eigenvalue gaps
>         for(int i=0;i<2;i++){
>             next = PowerIteration_routine(initial);
>             initial = next;
>             next.gap.print();
>         }
> 
>         env.execute("Power Iteration");
>     }
> 
>     public static DataSource<Tuple3<Integer, Integer, Double>> 
> readMatrix(ExecutionEnvironment env,
>                                                                           
> String filePath) {
>         CsvReader csvReader = env.readCsvFile(filePath);
>         csvReader.fieldDelimiter(",");
>         csvReader.includeFields("ttt");
>         return csvReader.types(Integer.class, Integer.class, Double.class);
>     }
> 
>     public static final class ProjectJoinResultMapper implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f1.f1;
>             Double product = value.f0.f2 * value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, product);
>         }
>     }
> 
>     public static final class RQ implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, 
> Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
> 
>             return new Tuple3<Integer, Integer, 
> Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
>         }
>     }
> 
>     public static void main(String[] args) throws Exception {
>         if(args.length<2 || args.length > 4){
>             System.err.println("Usage: PowerIteration <input path> <result 
> path> optional: <iterations> <threshold diff>");
>             System.exit(0);
>         }
> 
>         input = args[0];
>         output = args[1];
> 
>         if(args.length==3) {
>             iterations = Integer.parseInt(args[2]);
>         }
>         if(args.length==4){
>             delta = Double.parseDouble(args[3]);
>         }
> 
>         new PowerIteration2().run();
>     }
> 
>     public static final class deltaFilter implements 
> FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, 
> Double>,Tuple3<Integer, Integer, Double>> {
> 
>         public void join(Tuple3<Integer, Integer, Double> candidate, 
> Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, 
> Double>> out) {
> 
>             if(!(candidate.f2 == old.f2)){
>                 out.collect(candidate);
>             }
> 
>             //if(Math.abs(candidate.f2-old.f2) > delta){
>             //    out.collect(candidate);
>             //}
> 
>         }
>     }
> 
>     public static final class normalizeByMax implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, 
> Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, 
> Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
>         }
>     }
> 
>     public static final class firstX implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, 
> Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, 
> Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
>         }
>     }
> 
>     public static final class resetIndex implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
>         }
>     }
> 
> 
>     public static final class decBy1 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, 
> Double>(value.f0-1,value.f1-1,value.f2);
>         }
>     }
> 
>     public static final class resetIndex2 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
>         }
>     }
> 
>     public static final class MatrixTimesValue implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, 
> Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
> 
>             return new Tuple3<Integer, Integer, 
> Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
>         }
>     }
> 
>     public static final class MatrixMinusMatrix implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 - value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
> 
>     public static final class getGapCenter implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
> 
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> 
> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> 
> matrixA) throws Exception {
> 
>         //get initial vector - which equals matrixA * [1, ... , 1]
>         DataSet<Tuple3<Integer, Integer, Double>> initial0 = 
> matrixA.groupBy(0).aggregate(Aggregations.SUM,2);
> 
>         //normalize by maximum value
>         DataSet<Tuple3<Integer, Integer, Double>> initial= 
> initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());
> 
>         //BulkIteration to find dominant eigenvector
>         IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = 
> initial.iterate(iterations);
> 
>         DataSet<Tuple3<Integer, Integer, Double>> intermediate = 
> (matrixA.join(iteration).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
>                 cross((matrixA.join(iteration).where(1).equalTo(0)
>                         .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 
> 2).maxBy(2))
>                 .map(new normalizeByMax());
> 
>         DataSet<Tuple3<Integer, Integer, Double>> diffs = 
> iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector  = 
> iteration.closeWith(intermediate,diffs);
> 
>         return eigenVector;
>     }
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> 
> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> 
> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {
> 
>         //determine now EigenValue by approximating the Rayleigh Quotient:
>         //get Ax
>         DataSet<Tuple3<Integer, Integer, Double>> Ax = 
> matrixA.join(eigenVector).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
>         //get Ax * x
>         DataSet<Tuple3<Integer, Integer, Double>> Axx = 
> eigenVector.join(Ax).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
> 
>         //now x * x
>         DataSet<Tuple3<Integer, Integer, Double>> xx = 
> eigenVector.join(eigenVector).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
> 
>         return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
>     }
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> 
> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> 
> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, 
> DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {
> 
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = 
> eigenValue.map(new resetIndex());
>         DataSet<Tuple3<Integer, Integer, Double>> firstVal = 
> eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         firstVal = eigenValueReset.cross(firstVal).map(new firstX());
>         DataSet<Tuple3<Integer, Integer, Double>> firstRow = 
> matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new 
> DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new 
> MatrixTimesValue())).map(new DataSetUtils.transpose());
>         DataSet<Tuple3<Integer, Integer, Double>> C = 
> eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new 
> resetIndex2()).join(x).where(1).equalTo(0).
>                 map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).aggregate(Aggregations.SUM, 2);
>         matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new 
> MatrixMinusMatrix());
>         matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, 
> Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return (value.f0 != 0) && (value.f1 != 0);
>             }
>         });
> 
>         return matrixA.map(new decBy1());
>     }
> 
>     public MyResult PowerIteration_routine(MyResult initial) throws Exception 
> {
> 
>         //Approximate EigenVector by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector = 
> PowerIteration_getEigenVector(initial.matrixA);
>         //Approximate EigenValue by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue = 
> PowerIteration_getEigenValue(initial.matrixA, eigenVector);
>         //get gap
>         DataSet<Tuple3<Integer, Integer, Double>> gap = 
> initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
>         //Deflate original matrix
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = 
> PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);
> 
>         return new MyResult(eigenVector,eigenValue,matrixA,gap);
>     }
> 
>     public class MyResult {
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
>         DataSet<Tuple3<Integer, Integer, Double>> gap;
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA;
> 
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> 
> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> 
> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>         }
> 
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> 
> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> 
> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, 
> DataSet<Tuple3<Integer, Integer, Double>> gap){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>             this.gap = gap;
>         }
>     }
> 
> }
> 

Reply via email to