Let us have a look...
On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <
[email protected]> wrote:
> Hi Flinksters!
>
> The current stable Flink compiler rejects my plan. But I don't have a clue
> why. The causing line of code is marked:
>
> Code:
>
> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
> DataSet[Vector]): DataSet[Vector] = {
> val emptyDataSet = env.fromCollection[Vector](Seq())
> val costs = emptyDataSet.iterateDelta(widthCandidates,
> config.NWidthCandidates, Array("id")) {
> (solutionset, workset) =>
> val currentWidth = workset filter (new RichFilterFunction[Vector]{
> def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
> })
>
> val kernelVector = getKernelVector(X, center, currentWidth)
>
> val x1 = kernelVector dot residual map {x => x*x}
> val x2 = kernelVector dot kernelVector
>
> val cost = (x1 / x2) neutralize
>
>
> (cost map (new RichMapFunction[Vector, Vector]{
> def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
> }),
> workset)
> }
>
> val maxCost = costs max(0)
> >>>>>>>>>>>>>>>>>>>>>>>>>
> val width = maxCost join widthCandidates where "id" equalTo "id" map
> {x => x._2}
> >>>>>>>>>>>>>>>>>>>>>>>>>
>
> //val kernelVector = getKernelVector(X, center, width)
>
> //val x1 = kernelVector dot residual
> //val x2 = kernelVector dot kernelVector
> //val height = x1 / x2
> //costs
> width
> }
>
>
> The error message is:
>
> java.lang.IllegalArgumentException: The given strategy does not work on
> two inputs.
> at
> org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
> at
> org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
> at
> org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
> at
> org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
> at
> org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
> at
> org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>