Hi everybody,

as already stated, I try currently to implement a Machine Learning
algorithm on Stratosphere for the ML group at TU Berlin. I ran into some
issues.

The basic scheme of my algorithm is:

X = input data
Y = input data
residuals = Y

model = array[float, float, float] size n

for i in 1:n
  a = calc_a(X, residuals)
  b = calc_b(X, a, residuals)
  c = calc_c(X, a, b, c, residuals)

  model(i) = (a, b, c)
  residuals = update_residuals(residuals, a, b, c)

output model

My attempt now would be to use the delta iterations, use the model as
solution set, and the residuals as working sets:

Code:
-------------------------------------------------------------------------------------------------------------------------------------------------------
 val X = getInputSource
val Y = DataSource(YFile, CsvInputFormat[Float])

val model = CollectionDataSource[(Int, Float, Float, Float)](List())
val residual = Y

def step_function(model: DataSet[(Int, Float, Float, Float)], residuals:
DataSet[Float]) = {
import util.Random
(CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, 2.0f))),
residuals)
}

model.iterateWithDelta(
residual,
{ x: (Int, Float, Float, Float) => x._1 },
step_function,
config.iterations
)

val output = model //map { x => println(x); x }
val sink = output.write(outFile, CsvOutputFormat[(Int, Float, Float,
Float)], "Model output")

Code End
----------------------------------------------------------------------------------------------------------------------------------------------------

At the moment I try just to output a list of tuples.

My problems are:
- instead of the random integer I would like to insert the index of the
iterations.
- I get this error:
08/12/2014 20:14:37: Job execution switched to status SCHEDULED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to SCHEDULED
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to SCHEDULED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to ASSIGNED
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to ASSIGNED
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to READY
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to STARTING
08/12/2014 20:14:37: Job execution switched to status RUNNING
08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to RUNNING
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to READY
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to STARTING
08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to RUNNING
08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to FINISHING
08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>) (1/1)
switched to CANCELING
08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to FAILED
java.lang.RuntimeException: Cannot serialize record with out field at
position: 0
at
eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295)
at
eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50)
at
eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178)
at
eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284)
at java.lang.Thread.run(Thread.java:744)

I doubt there is no record inside model. Because if I enable the map
function in the second last line I get an IndexOutOfBounds exception at
index 0.

Many thanks in adavance

Cheers,
Max

Reply via email to