Hi Robert! Ok, good to know. Thanks!
Cheers! Max On Thu, Jan 8, 2015 at 4:16 PM, Robert Metzger <[email protected]> wrote: > Hi Max, > > No. I think there is nobody in the Flink community who has plans to > implement nested iterations in the near future. > > On Wed, Jan 7, 2015 at 10:58 AM, Maximilian Alber < > [email protected]> wrote: > >> Thanks! >> >> I made a workaround using a pseudo join with the workset. But now I'm >> back to the nested iteration issue. Is there any chance that this feature >> will be available in the next time(2-3 weeks)? >> >> Cheers, >> Max >> >> On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <[email protected]> >> wrote: >> >>> Hi, >>> the problem is that your operations do not depend on the >>> iteration-step-dataset. Your code could be rewritten like this to make it >>> more obvious: >>> >>> val emptyDataSet = env.fromCollection[Vector](Seq()) >>> // here we call the function >>> val center = calcCenter(env, X, residual, randoms, -1) >>> >>> val centerX = (X subtV center) map {_ square} >>> val x = calcWidthHeight(env, centerX, residual, widthCandidates, >>> center) >>> val width = x._1 >>> val height = x._2 >>> >>> residual = residual - (getKernelVector(X, center, width) multV >>> height) >>> >>> val centerOut = center map {x => new Vector(0, x.values)} >>> val widthOut = width map {x => new Vector(1, x.values)} >>> val heightOut = height map {x => new Vector(2, x.values)} >>> val stepModel = centerOut union widthOut union height >>> >>> // here the loop begins >>> val model = emptyDataSet.iterate(config.iterations){ >>> stepSet => >>> >>> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >>> def map(x: Vector) = new >>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >>> x.values) >>> })) >>> } >>> >>> model map { _ toString } writeAsText config.outFile >>> } >>> >>> This means that for the system these operations are considered to be >>> outside the loop, thus you don't have access to the IterationContext. >>> >>> Regards, >>> Aljoscha >>> >>> On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber < >>> [email protected]> wrote: >>> >>>> Hey Flinksters! >>>> >>>> ran into this error >>>> >>>> java.lang.IllegalStateException: This stub is not part of an iteration >>>> step function. >>>> >>>> below is my code, the concerning parts are marked. Is it a problem, >>>> that the stub is in a function that is called from the iteration step >>>> function? >>>> >>>> >>>> Code: >>>> >>>> ...... >>>> >>>> val emptyDataSet = env.fromCollection[Vector](Seq()) >>>> // here the loop begins >>>> val model = emptyDataSet.iterate(config.iterations){ >>>> stepSet => >>>> // here we call the function >>>> val center = calcCenter(env, X, residual, randoms, -1) >>>> >>>> val centerX = (X subtV center) map {_ square} >>>> val x = calcWidthHeight(env, centerX, residual, widthCandidates, >>>> center) >>>> val width = x._1 >>>> val height = x._2 >>>> >>>> residual = residual - (getKernelVector(X, center, width) multV >>>> height) >>>> >>>> val centerOut = center map {x => new Vector(0, x.values)} >>>> val widthOut = width map {x => new Vector(1, x.values)} >>>> val heightOut = height map {x => new Vector(2, x.values)} >>>> val stepModel = centerOut union widthOut union height >>>> >>>> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >>>> def map(x: Vector) = new >>>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >>>> x.values) >>>> })) >>>> } >>>> >>>> model map { _ toString } writeAsText config.outFile >>>> } >>>> >>>> >>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: >>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] >>>> = { >>>> val residual_2 = residual * residual >>>> val randomValue = if(iteration >= 0) >>>> (randoms filter {_.id == iteration}) >>>> else >>>> // and this filter function causes the error >>>> (randoms.filter(new RichFilterFunction[Vector]{ >>>> def filter(x: Vector) = x.id == >>>> (getIterationRuntimeContext.getSuperstepNumber-1) >>>> })) >>>> val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize)) >>>> >>>> ..... >>>> >>>> The full errror: >>>> >>>> Error: The program execution failed: java.lang.IllegalStateException: >>>> This stub is not part of an iteration step function. >>>> at >>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59) >>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119) >>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118) >>>> at >>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47) >>>> at >>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) >>>> at >>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>>> at >>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> I append my program with the input files. To reproduce the error use >>>> following command line args, please replace in_file, random_file, >>>> width_candidates with the provided ones, and put for out_file the path you >>>> want to: >>>> >>>> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar >>>> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100 >>>> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30 >>>> multi_bump_boost=0 gradient_descent_iterations=30 cache=False >>>> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08 >>>> max_width_update=10 >>>> >>>> Thanks! >>>> Cheers, >>>> Max >>>> >>> >>> >> >
