Let us have a look...

On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <
[email protected]> wrote:

> Hi Flinksters!
>
> The current stable Flink compiler rejects my plan. But I don't have a clue
> why. The causing line of code is marked:
>
> Code:
>
> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
> DataSet[Vector]): DataSet[Vector] = {
>     val emptyDataSet = env.fromCollection[Vector](Seq())
>     val costs = emptyDataSet.iterateDelta(widthCandidates,
> config.NWidthCandidates, Array("id")) {
>       (solutionset, workset) =>
>       val currentWidth = workset filter (new RichFilterFunction[Vector]{
>        def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
>       })
>
>      val kernelVector = getKernelVector(X, center, currentWidth)
>
>      val x1 = kernelVector dot residual map {x => x*x}
>      val x2 = kernelVector dot kernelVector
>
>      val cost = (x1 / x2) neutralize
>
>
>      (cost map (new RichMapFunction[Vector, Vector]{
>       def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>      }),
>      workset)
>     }
>
>     val maxCost = costs max(0)
> >>>>>>>>>>>>>>>>>>>>>>>>>
>     val width = maxCost join widthCandidates where "id" equalTo "id" map
> {x => x._2}
> >>>>>>>>>>>>>>>>>>>>>>>>>
>
>     //val kernelVector = getKernelVector(X, center, width)
>
>     //val x1 = kernelVector dot residual
>     //val x2 = kernelVector dot kernelVector
>     //val height = x1 / x2
>     //costs
>     width
> }
>
>
> The error message is:
>
> java.lang.IllegalArgumentException: The given strategy does not work on
> two inputs.
> at
> org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
> at
> org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
> at
> org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
> at
> org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
> at
> org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
> at
> org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>

Reply via email to