Dear Flink Community,
is there a compact and efficient way to get parameters that are know at
run-time, but not compile-time inside an iteration? I tried the following:
>define an object with the parameters:
object iterationVariables{
var numDataPoints = 1
var lambda = 0.2
var stepSize = 0.01
}
?>update it in the driver before starting the iteration:
iterationVariables.numDataPoints = numDP
iterationVariables.lambda = l
iterationVariable.stepSize = s
>and then use it inside the iteration - accessing it accordingly?:
val resultingWeights = weightVector.iterate(numIterations) {
weights => {
val computeGradient = new RichMapFunction[LabeledPoint, DenseVector[Double]] {
var originalW: DenseVector[Double] = _
override def open(parameters: Configuration): Unit = {
originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
}
override def map(dp: LabeledPoint): DenseVector[Double] = {
val learning_rate: Double = iterationVariables.s /
Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble)
val sumElement = (dp.features.toDenseVector * (dp.label -
mlutils.logisticFunction(originalW, (dp.features)))
- (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble) *
originalW
) * learning_rate
sumElement
}
}
val newWeights : DataSet[DenseVector[Double]] =
weights.union(data.map(computeGradient).withBroadcastSet(weights,
WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _}
newWeights
}
This did work perfectly fine in local mode, however once deployed on an actual
cluster, iterationVariables inside the iteration actually returns the values
set in the original object (e.g. numDataPoints = 1) and not the updated value
that was set later in the driver, ultimately leading to wrong results in the
computation.
So once again, is there a way to get parameters the will only be known at
run-time inside an iteration?
Best regards,
Christoph Boden