Hi flinksters,
I'm really close to the end (at least I hope so), but I still have some
issues.
Writing my final loop I got this error:
org.apache.flink.compiler.CompilerException: An error occurred while
translating the optimized plan to a nephele JobGraph: An error occurred
while translating the optimized plan to a nephele JobGraph: Error: It is
currently not supported to union between dynamic and static path in an
iteration.
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
I guess that I should use just the loop step set to create the next step
set?
Here is the code:
def createPlanFullIterations(env: ExecutionEnvironment) = {
val tmp = env readTextFile config.inFile map
{Vector.parseFromSVMLightString (config.dimensions, _)}
val X = tmp map {_._1}
var residual = tmp map {_._2}
val randoms = env readTextFile config.randomFile map
{Vector.parseFromString(_)}
val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile !=
null)
env readTextFile config.widthCandidatesFile map
{Vector.parseFromString(config.dimensions, _)}
else
null
val emptyDataSet = env.fromCollection[Vector](Seq())
val model = emptyDataSet.iterate(config.iterations){
stepSet =>
val center = calcCenter(env, X, residual, randoms, -1)
val x = calcWidthHeight(env, X, residual, widthCandidates, center)
val width = x._1
val height = x._2
residual = residual - (getKernelVector(X, center, width).map(new
RichMapFunction[Vector, Vector]{
var height: Vector = null
override def open(config: Configuration) = {
height =
getRuntimeContext.getBroadcastVariable("height").toList.head
}
def map(x: Vector) = {x * height}
}).withBroadcastSet(height, "height"))
val centerOut = center map {x => new Vector(0, x.values)}
val widthOut = width map {x => new Vector(1, x.values)}
val heightOut = height map {x => new Vector(2, x.values)}
val stepModel = centerOut union widthOut union height
stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
def map(x: Vector) = new
Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
}))
}
model map { _ toString } writeAsText config.outFile
}
thanks!
Cheers,
Max