Hi,
right now, the only way of updating the solution set in a delta iteration
is by joining with the solution set from the previous iteration and having
the result of that join as the result of the step function. I working on
simplifying iterations as well as bringing the Scala API to feature parity
with the Java API. It should not be possible right now to create a new data
source inside each iteration step.
The way to get at the current iteration number is by having a rich function
instead of a lambda function. So instead of:
val someSet = ...
val otherSet = someSet map { x => x + 1}
you would have:
val someSet = ...
val otherSet = someSet map( new MapFunction[InType, OutType]() {
def apply(in: SomeType): SomeOtherType = {
val iteration = getIterationRuntimeContext().getSuperstepNumber()
(iteration, x, y, ...)
}
})
I hope that helps.
Aljoscha
On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber <
[email protected]> wrote:
> Hi everybody,
>
> as already stated, I try currently to implement a Machine Learning
> algorithm on Stratosphere for the ML group at TU Berlin. I ran into some
> issues.
>
> The basic scheme of my algorithm is:
>
> X = input data
> Y = input data
> residuals = Y
>
> model = array[float, float, float] size n
>
> for i in 1:n
> a = calc_a(X, residuals)
> b = calc_b(X, a, residuals)
> c = calc_c(X, a, b, c, residuals)
>
> model(i) = (a, b, c)
> residuals = update_residuals(residuals, a, b, c)
>
> output model
>
> My attempt now would be to use the delta iterations, use the model as
> solution set, and the residuals as working sets:
>
> Code:
> -------------------------------------------------------------------------------------------------------------------------------------------------------
> val X = getInputSource
> val Y = DataSource(YFile, CsvInputFormat[Float])
>
> val model = CollectionDataSource[(Int, Float, Float, Float)](List())
> val residual = Y
>
> def step_function(model: DataSet[(Int, Float, Float, Float)], residuals:
> DataSet[Float]) = {
> import util.Random
> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, 2.0f))),
> residuals)
> }
>
> model.iterateWithDelta(
> residual,
> { x: (Int, Float, Float, Float) => x._1 },
> step_function,
> config.iterations
> )
>
> val output = model //map { x => println(x); x }
> val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float,
> Float)], "Model output")
>
> Code End
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
> At the moment I try just to output a list of tuples.
>
> My problems are:
> - instead of the random integer I would like to insert the index of the
> iterations.
> - I get this error:
> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED
> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to SCHEDULED
> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to SCHEDULED
> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to ASSIGNED
> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED
> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to READY
> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to STARTING
> 08/12/2014 20:14:37: Job execution switched to status RUNNING
> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to RUNNING
> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY
> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING
> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING
> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to FINISHING
> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
> switched to CANCELING
> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED
> java.lang.RuntimeException: Cannot serialize record with out field at
> position: 0
> at
> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295)
> at
> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50)
> at
> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178)
> at
> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284)
> at java.lang.Thread.run(Thread.java:744)
>
> I doubt there is no record inside model. Because if I enable the map
> function in the second last line I get an IndexOutOfBounds exception at
> index 0.
>
> Many thanks in adavance
>
> Cheers,
> Max
>
>
>
>