Hi! Yes, that is a known limitation. It is sort of straightforward to resolve, but will take a bit of time. I will try and get to it until the end of the week.
Stephan On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber < [email protected]> wrote: > Hi flinksters, > > I'm really close to the end (at least I hope so), but I still have some > issues. > Writing my final loop I got this error: > > org.apache.flink.compiler.CompilerException: An error occurred while > translating the optimized plan to a nephele JobGraph: An error occurred > while translating the optimized plan to a nephele JobGraph: Error: It is > currently not supported to union between dynamic and static path in an > iteration. > at > org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567) > at > org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96) > at > org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) > at > org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196) > at > org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196) > > I guess that I should use just the loop step set to create the next step > set? > > Here is the code: > > def createPlanFullIterations(env: ExecutionEnvironment) = { > val tmp = env readTextFile config.inFile map > {Vector.parseFromSVMLightString (config.dimensions, _)} > val X = tmp map {_._1} > var residual = tmp map {_._2} > val randoms = env readTextFile config.randomFile map > {Vector.parseFromString(_)} > val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != > null) > env readTextFile config.widthCandidatesFile map > {Vector.parseFromString(config.dimensions, _)} > else > null > > val emptyDataSet = env.fromCollection[Vector](Seq()) > val model = emptyDataSet.iterate(config.iterations){ > stepSet => > val center = calcCenter(env, X, residual, randoms, -1) > > val x = calcWidthHeight(env, X, residual, widthCandidates, center) > val width = x._1 > val height = x._2 > > residual = residual - (getKernelVector(X, center, width).map(new > RichMapFunction[Vector, Vector]{ > var height: Vector = null > override def open(config: Configuration) = { > height = > getRuntimeContext.getBroadcastVariable("height").toList.head > } > def map(x: Vector) = {x * height} > }).withBroadcastSet(height, "height")) > > val centerOut = center map {x => new Vector(0, x.values)} > val widthOut = width map {x => new Vector(1, x.values)} > val heightOut = height map {x => new Vector(2, x.values)} > val stepModel = centerOut union widthOut union height > > stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ > def map(x: Vector) = new > Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values) > })) > } > > model map { _ toString } writeAsText config.outFile > } > > > thanks! > Cheers, > Max >
