Hi Max, No. I think there is nobody in the Flink community who has plans to implement nested iterations in the near future.
On Wed, Jan 7, 2015 at 10:58 AM, Maximilian Alber < [email protected]> wrote: > Thanks! > > I made a workaround using a pseudo join with the workset. But now I'm back > to the nested iteration issue. Is there any chance that this feature will > be available in the next time(2-3 weeks)? > > Cheers, > Max > > On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <[email protected]> > wrote: > >> Hi, >> the problem is that your operations do not depend on the >> iteration-step-dataset. Your code could be rewritten like this to make it >> more obvious: >> >> val emptyDataSet = env.fromCollection[Vector](Seq()) >> // here we call the function >> val center = calcCenter(env, X, residual, randoms, -1) >> >> val centerX = (X subtV center) map {_ square} >> val x = calcWidthHeight(env, centerX, residual, widthCandidates, >> center) >> val width = x._1 >> val height = x._2 >> >> residual = residual - (getKernelVector(X, center, width) multV >> height) >> >> val centerOut = center map {x => new Vector(0, x.values)} >> val widthOut = width map {x => new Vector(1, x.values)} >> val heightOut = height map {x => new Vector(2, x.values)} >> val stepModel = centerOut union widthOut union height >> >> // here the loop begins >> val model = emptyDataSet.iterate(config.iterations){ >> stepSet => >> >> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >> def map(x: Vector) = new >> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >> x.values) >> })) >> } >> >> model map { _ toString } writeAsText config.outFile >> } >> >> This means that for the system these operations are considered to be >> outside the loop, thus you don't have access to the IterationContext. >> >> Regards, >> Aljoscha >> >> On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber < >> [email protected]> wrote: >> >>> Hey Flinksters! >>> >>> ran into this error >>> >>> java.lang.IllegalStateException: This stub is not part of an iteration >>> step function. >>> >>> below is my code, the concerning parts are marked. Is it a problem, that >>> the stub is in a function that is called from the iteration step function? >>> >>> >>> Code: >>> >>> ...... >>> >>> val emptyDataSet = env.fromCollection[Vector](Seq()) >>> // here the loop begins >>> val model = emptyDataSet.iterate(config.iterations){ >>> stepSet => >>> // here we call the function >>> val center = calcCenter(env, X, residual, randoms, -1) >>> >>> val centerX = (X subtV center) map {_ square} >>> val x = calcWidthHeight(env, centerX, residual, widthCandidates, >>> center) >>> val width = x._1 >>> val height = x._2 >>> >>> residual = residual - (getKernelVector(X, center, width) multV >>> height) >>> >>> val centerOut = center map {x => new Vector(0, x.values)} >>> val widthOut = width map {x => new Vector(1, x.values)} >>> val heightOut = height map {x => new Vector(2, x.values)} >>> val stepModel = centerOut union widthOut union height >>> >>> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >>> def map(x: Vector) = new >>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >>> x.values) >>> })) >>> } >>> >>> model map { _ toString } writeAsText config.outFile >>> } >>> >>> >>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: >>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] >>> = { >>> val residual_2 = residual * residual >>> val randomValue = if(iteration >= 0) >>> (randoms filter {_.id == iteration}) >>> else >>> // and this filter function causes the error >>> (randoms.filter(new RichFilterFunction[Vector]{ >>> def filter(x: Vector) = x.id == >>> (getIterationRuntimeContext.getSuperstepNumber-1) >>> })) >>> val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize)) >>> >>> ..... >>> >>> The full errror: >>> >>> Error: The program execution failed: java.lang.IllegalStateException: >>> This stub is not part of an iteration step function. >>> at >>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59) >>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119) >>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118) >>> at >>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47) >>> at >>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) >>> at >>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>> at >>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> I append my program with the input files. To reproduce the error use >>> following command line args, please replace in_file, random_file, >>> width_candidates with the provided ones, and put for out_file the path you >>> want to: >>> >>> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar >>> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100 >>> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30 >>> multi_bump_boost=0 gradient_descent_iterations=30 cache=False >>> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08 >>> max_width_update=10 >>> >>> Thanks! >>> Cheers, >>> Max >>> >> >> >
