Hi,
there is no RichFunction in the Java API either. You don't have to create a
new DataSet. Your iteration result will be a DataSet that results from some
operations based on the previous SolutionSet and/or WorkingSet. For example:
def stepFunction(s: DataSet[SolutionType], ws: DataSet[WorksetType]) = {
val intermediate = ws.join(somethingFromOutside) where {...} isEqualTo
{...} map {...}
val newSolution = s.join(intermediate) where ...
val newWorkset = ...
(newSolution, newWorkset)
}
Aljoscha
On Sun, Aug 17, 2014 at 6:14 PM, Maximilian Alber <
[email protected]> wrote:
> Hi!
> Thank you!
>
> But how do I join my result to the solution set if I cannot create a new
> DataSet inside the iteration?
> In Scala there is not yet a RichFunction for the Iterations, am I right?
> So I should best use the Java class?
>
> Mit freundlichen Grüßen,
> Max!
>
>
> On Fri, Aug 15, 2014 at 3:50 PM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> Hi,
>> right now, the only way of updating the solution set in a delta iteration
>> is by joining with the solution set from the previous iteration and having
>> the result of that join as the result of the step function. I working on
>> simplifying iterations as well as bringing the Scala API to feature parity
>> with the Java API. It should not be possible right now to create a new data
>> source inside each iteration step.
>>
>> The way to get at the current iteration number is by having a rich
>> function instead of a lambda function. So instead of:
>> val someSet = ...
>> val otherSet = someSet map { x => x + 1}
>>
>> you would have:
>> val someSet = ...
>> val otherSet = someSet map( new MapFunction[InType, OutType]() {
>> def apply(in: SomeType): SomeOtherType = {
>> val iteration = getIterationRuntimeContext().getSuperstepNumber()
>> (iteration, x, y, ...)
>> }
>> })
>>
>> I hope that helps.
>>
>> Aljoscha
>>
>>
>> On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber <
>> [email protected]> wrote:
>>
>>> Hi everybody,
>>>
>>> as already stated, I try currently to implement a Machine Learning
>>> algorithm on Stratosphere for the ML group at TU Berlin. I ran into some
>>> issues.
>>>
>>> The basic scheme of my algorithm is:
>>>
>>> X = input data
>>> Y = input data
>>> residuals = Y
>>>
>>> model = array[float, float, float] size n
>>>
>>> for i in 1:n
>>> a = calc_a(X, residuals)
>>> b = calc_b(X, a, residuals)
>>> c = calc_c(X, a, b, c, residuals)
>>>
>>> model(i) = (a, b, c)
>>> residuals = update_residuals(residuals, a, b, c)
>>>
>>> output model
>>>
>>> My attempt now would be to use the delta iterations, use the model as
>>> solution set, and the residuals as working sets:
>>>
>>> Code:
>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>> val X = getInputSource
>>> val Y = DataSource(YFile, CsvInputFormat[Float])
>>>
>>> val model = CollectionDataSource[(Int, Float, Float, Float)](List())
>>> val residual = Y
>>>
>>> def step_function(model: DataSet[(Int, Float, Float, Float)], residuals:
>>> DataSet[Float]) = {
>>> import util.Random
>>> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f,
>>> 2.0f))), residuals)
>>> }
>>>
>>> model.iterateWithDelta(
>>> residual,
>>> { x: (Int, Float, Float, Float) => x._1 },
>>> step_function,
>>> config.iterations
>>> )
>>>
>>> val output = model //map { x => println(x); x }
>>> val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float,
>>> Float)], "Model output")
>>>
>>> Code End
>>> ----------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> At the moment I try just to output a list of tuples.
>>>
>>> My problems are:
>>> - instead of the random integer I would like to insert the index of the
>>> iterations.
>>> - I get this error:
>>> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED
>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to SCHEDULED
>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to SCHEDULED
>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to ASSIGNED
>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED
>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to READY
>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to STARTING
>>> 08/12/2014 20:14:37: Job execution switched to status RUNNING
>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to RUNNING
>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY
>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING
>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING
>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to FINISHING
>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
>>> switched to CANCELING
>>> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED
>>> java.lang.RuntimeException: Cannot serialize record with out field at
>>> position: 0
>>> at
>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295)
>>> at
>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50)
>>> at
>>> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178)
>>> at
>>> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284)
>>> at java.lang.Thread.run(Thread.java:744)
>>>
>>> I doubt there is no record inside model. Because if I enable the map
>>> function in the second last line I get an IndexOutOfBounds exception at
>>> index 0.
>>>
>>> Many thanks in adavance
>>>
>>> Cheers,
>>> Max
>>>
>>>
>>>
>>>
>>
>>
>