Ok. I'm back at this point: In the 0.7 version is there a way to get the superstep number inside a iterateWithDeta function?
Cheers, Max On Mon, Aug 18, 2014 at 12:05 PM, Aljoscha Krettek <[email protected]> wrote: > Yes, but they were always available. Because user code in Java was always > in "Rich Functions". There is no rich function for iterations, though, > since iterations themselves don't have user code attached. > > Aljoscha > > > On Mon, Aug 18, 2014 at 10:59 AM, Fabian Hueske <[email protected]> > wrote: > >> RichFunctions were added to the JavaAPI recently: >> >> >> https://github.com/apache/incubator-flink/tree/72d7b86274c33d1570ffb22b1fca2081c15d753c/flink-java/src/main/java/org/apache/flink/api/java/functions >> >> Cheers, Fabian >> >> >> 2014-08-18 8:16 GMT+02:00 Aljoscha Krettek <[email protected]>: >> >> Hi, >>> there is no RichFunction in the Java API either. You don't have to >>> create a new DataSet. Your iteration result will be a DataSet that results >>> from some operations based on the previous SolutionSet and/or WorkingSet. >>> For example: >>> >>> def stepFunction(s: DataSet[SolutionType], ws: DataSet[WorksetType]) = { >>> val intermediate = ws.join(somethingFromOutside) where {...} isEqualTo >>> {...} map {...} >>> val newSolution = s.join(intermediate) where ... >>> val newWorkset = ... >>> (newSolution, newWorkset) >>> } >>> >>> Aljoscha >>> >>> >>> On Sun, Aug 17, 2014 at 6:14 PM, Maximilian Alber < >>> [email protected]> wrote: >>> >>>> Hi! >>>> Thank you! >>>> >>>> But how do I join my result to the solution set if I cannot create a >>>> new DataSet inside the iteration? >>>> In Scala there is not yet a RichFunction for the Iterations, am I >>>> right? So I should best use the Java class? >>>> >>>> Mit freundlichen Grüßen, >>>> Max! >>>> >>>> >>>> On Fri, Aug 15, 2014 at 3:50 PM, Aljoscha Krettek <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> right now, the only way of updating the solution set in a delta >>>>> iteration is by joining with the solution set from the previous iteration >>>>> and having the result of that join as the result of the step function. I >>>>> working on simplifying iterations as well as bringing the Scala API to >>>>> feature parity with the Java API. It should not be possible right now to >>>>> create a new data source inside each iteration step. >>>>> >>>>> The way to get at the current iteration number is by having a rich >>>>> function instead of a lambda function. So instead of: >>>>> val someSet = ... >>>>> val otherSet = someSet map { x => x + 1} >>>>> >>>>> you would have: >>>>> val someSet = ... >>>>> val otherSet = someSet map( new MapFunction[InType, OutType]() { >>>>> def apply(in: SomeType): SomeOtherType = { >>>>> val iteration = getIterationRuntimeContext().getSuperstepNumber() >>>>> (iteration, x, y, ...) >>>>> } >>>>> }) >>>>> >>>>> I hope that helps. >>>>> >>>>> Aljoscha >>>>> >>>>> >>>>> On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi everybody, >>>>>> >>>>>> as already stated, I try currently to implement a Machine Learning >>>>>> algorithm on Stratosphere for the ML group at TU Berlin. I ran into some >>>>>> issues. >>>>>> >>>>>> The basic scheme of my algorithm is: >>>>>> >>>>>> X = input data >>>>>> Y = input data >>>>>> residuals = Y >>>>>> >>>>>> model = array[float, float, float] size n >>>>>> >>>>>> for i in 1:n >>>>>> a = calc_a(X, residuals) >>>>>> b = calc_b(X, a, residuals) >>>>>> c = calc_c(X, a, b, c, residuals) >>>>>> >>>>>> model(i) = (a, b, c) >>>>>> residuals = update_residuals(residuals, a, b, c) >>>>>> >>>>>> output model >>>>>> >>>>>> My attempt now would be to use the delta iterations, use the model as >>>>>> solution set, and the residuals as working sets: >>>>>> >>>>>> Code: >>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> val X = getInputSource >>>>>> val Y = DataSource(YFile, CsvInputFormat[Float]) >>>>>> >>>>>> val model = CollectionDataSource[(Int, Float, Float, Float)](List()) >>>>>> val residual = Y >>>>>> >>>>>> def step_function(model: DataSet[(Int, Float, Float, Float)], >>>>>> residuals: DataSet[Float]) = { >>>>>> import util.Random >>>>>> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, >>>>>> 2.0f))), residuals) >>>>>> } >>>>>> >>>>>> model.iterateWithDelta( >>>>>> residual, >>>>>> { x: (Int, Float, Float, Float) => x._1 }, >>>>>> step_function, >>>>>> config.iterations >>>>>> ) >>>>>> >>>>>> val output = model //map { x => println(x); x } >>>>>> val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float, >>>>>> Float)], "Model output") >>>>>> >>>>>> Code End >>>>>> ---------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>> >>>>>> At the moment I try just to output a list of tuples. >>>>>> >>>>>> My problems are: >>>>>> - instead of the random integer I would like to insert the index of >>>>>> the iterations. >>>>>> - I get this error: >>>>>> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED >>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to SCHEDULED >>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to >>>>>> SCHEDULED >>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to ASSIGNED >>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED >>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to READY >>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to STARTING >>>>>> 08/12/2014 20:14:37: Job execution switched to status RUNNING >>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to RUNNING >>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY >>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING >>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING >>>>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to FINISHING >>>>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) >>>>>> (1/1) switched to CANCELING >>>>>> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED >>>>>> java.lang.RuntimeException: Cannot serialize record with out field at >>>>>> position: 0 >>>>>> at >>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295) >>>>>> at >>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50) >>>>>> at >>>>>> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178) >>>>>> at >>>>>> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284) >>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>> >>>>>> I doubt there is no record inside model. Because if I enable the map >>>>>> function in the second last line I get an IndexOutOfBounds exception at >>>>>> index 0. >>>>>> >>>>>> Many thanks in adavance >>>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >
