Hi! Thank you! But how do I join my result to the solution set if I cannot create a new DataSet inside the iteration? In Scala there is not yet a RichFunction for the Iterations, am I right? So I should best use the Java class?
Mit freundlichen Grüßen, Max! On Fri, Aug 15, 2014 at 3:50 PM, Aljoscha Krettek <[email protected]> wrote: > Hi, > right now, the only way of updating the solution set in a delta iteration > is by joining with the solution set from the previous iteration and having > the result of that join as the result of the step function. I working on > simplifying iterations as well as bringing the Scala API to feature parity > with the Java API. It should not be possible right now to create a new data > source inside each iteration step. > > The way to get at the current iteration number is by having a rich > function instead of a lambda function. So instead of: > val someSet = ... > val otherSet = someSet map { x => x + 1} > > you would have: > val someSet = ... > val otherSet = someSet map( new MapFunction[InType, OutType]() { > def apply(in: SomeType): SomeOtherType = { > val iteration = getIterationRuntimeContext().getSuperstepNumber() > (iteration, x, y, ...) > } > }) > > I hope that helps. > > Aljoscha > > > On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber < > [email protected]> wrote: > >> Hi everybody, >> >> as already stated, I try currently to implement a Machine Learning >> algorithm on Stratosphere for the ML group at TU Berlin. I ran into some >> issues. >> >> The basic scheme of my algorithm is: >> >> X = input data >> Y = input data >> residuals = Y >> >> model = array[float, float, float] size n >> >> for i in 1:n >> a = calc_a(X, residuals) >> b = calc_b(X, a, residuals) >> c = calc_c(X, a, b, c, residuals) >> >> model(i) = (a, b, c) >> residuals = update_residuals(residuals, a, b, c) >> >> output model >> >> My attempt now would be to use the delta iterations, use the model as >> solution set, and the residuals as working sets: >> >> Code: >> ------------------------------------------------------------------------------------------------------------------------------------------------------- >> val X = getInputSource >> val Y = DataSource(YFile, CsvInputFormat[Float]) >> >> val model = CollectionDataSource[(Int, Float, Float, Float)](List()) >> val residual = Y >> >> def step_function(model: DataSet[(Int, Float, Float, Float)], residuals: >> DataSet[Float]) = { >> import util.Random >> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, 2.0f))), >> residuals) >> } >> >> model.iterateWithDelta( >> residual, >> { x: (Int, Float, Float, Float) => x._1 }, >> step_function, >> config.iterations >> ) >> >> val output = model //map { x => println(x); x } >> val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float, >> Float)], "Model output") >> >> Code End >> ---------------------------------------------------------------------------------------------------------------------------------------------------- >> >> At the moment I try just to output a list of tuples. >> >> My problems are: >> - instead of the random integer I would like to insert the index of the >> iterations. >> - I get this error: >> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED >> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to SCHEDULED >> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to SCHEDULED >> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to ASSIGNED >> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED >> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to READY >> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to STARTING >> 08/12/2014 20:14:37: Job execution switched to status RUNNING >> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to RUNNING >> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY >> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING >> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING >> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to FINISHING >> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1) >> switched to CANCELING >> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED >> java.lang.RuntimeException: Cannot serialize record with out field at >> position: 0 >> at >> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295) >> at >> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50) >> at >> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178) >> at >> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284) >> at java.lang.Thread.run(Thread.java:744) >> >> I doubt there is no record inside model. Because if I enable the map >> function in the second last line I get an IndexOutOfBounds exception at >> index 0. >> >> Many thanks in adavance >> >> Cheers, >> Max >> >> >> >> > >
