Hi everybody,
as already stated, I try currently to implement a Machine Learning
algorithm on Stratosphere for the ML group at TU Berlin. I ran into some
issues.
The basic scheme of my algorithm is:
X = input data
Y = input data
residuals = Y
model = array[float, float, float] size n
for i in 1:n
a = calc_a(X, residuals)
b = calc_b(X, a, residuals)
c = calc_c(X, a, b, c, residuals)
model(i) = (a, b, c)
residuals = update_residuals(residuals, a, b, c)
output model
My attempt now would be to use the delta iterations, use the model as
solution set, and the residuals as working sets:
Code:
-------------------------------------------------------------------------------------------------------------------------------------------------------
val X = getInputSource
val Y = DataSource(YFile, CsvInputFormat[Float])
val model = CollectionDataSource[(Int, Float, Float, Float)](List())
val residual = Y
def step_function(model: DataSet[(Int, Float, Float, Float)], residuals:
DataSet[Float]) = {
import util.Random
(CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, 2.0f))),
residuals)
}
model.iterateWithDelta(
residual,
{ x: (Int, Float, Float, Float) => x._1 },
step_function,
config.iterations
)
val output = model //map { x => println(x); x }
val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float,
Float)], "Model output")
Code End
----------------------------------------------------------------------------------------------------------------------------------------------------
At the moment I try just to output a list of tuples.
My problems are:
- instead of the random integer I would like to insert the index of the
iterations.
- I get this error:
08/12/2014 20:14:37: Job execution switched to status SCHEDULED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to SCHEDULED
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to SCHEDULED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to ASSIGNED
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to READY
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to STARTING
08/12/2014 20:14:37: Job execution switched to status RUNNING
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to RUNNING
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING
08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to FINISHING
08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to CANCELING
08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED
java.lang.RuntimeException: Cannot serialize record with out field at
position: 0
at
eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295)
at
eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50)
at
eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178)
at
eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284)
at java.lang.Thread.run(Thread.java:744)
I doubt there is no record inside model. Because if I enable the map
function in the second last line I get an IndexOutOfBounds exception at
index 0.
Many thanks in adavance
Cheers,
Max