All:
I'm trying to use SparseVectors with FlinkML 0.10.1. It does not seem to
be working. Here is a UnitTest that I created to recreate the problem:
*package* com.aol.ds.arc.ml.poc.flink
> *import* org.junit.After
> *import* org.junit.Before
> *import* org.slf4j.LoggerFactory
> *import* org.apache.flink.test.util.ForkableFlinkMiniCluster
> *import* scala.concurrent.duration.FiniteDuration
> *import* java.util.concurrent.TimeUnit
> *import* org.apache.flink.test.util.TestBaseUtils
> *import* org.apache.flink.runtime.StreamingMode
> *import* org.apache.flink.test.util.TestEnvironment
> *import* org.junit.Test
> *import* org.apache.flink.ml.common.LabeledVector
> *import* org.apache.flink.ml.math.SparseVector
> *import* org.apache.flink.api.scala._
> *import* org.apache.flink.ml.regression.MultipleLinearRegression
> *import* org.apache.flink.ml.math.DenseVector
> *class* FlinkMLRTest {
> *var* Logger = LoggerFactory.getLogger(getClass.getName)
> *var* cluster: Option[ForkableFlinkMiniCluster] = None
> *val* parallelism = 4
> *val* DEFAULT_AKKA_ASK_TIMEOUT = 1000
> *val* DEFAULT_TIMEOUT = *new* FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT,
> TimeUnit.SECONDS)
> @Before
> *def* doBefore(): Unit = {
> *val* cl = TestBaseUtils.startCluster(
> 1,
> parallelism,
> StreamingMode.BATCH_ONLY,
> *false*,
> *false*,
> *true*)
> *val* clusterEnvironment = *new* TestEnvironment(cl, parallelism)
> clusterEnvironment.setAsContext()
> cluster = Some(cl)
> }
> @After
> *def* doAfter(): Unit = {
> cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
> }
> @Test
> *def* testMLR() {
> *val* env = ExecutionEnvironment.getExecutionEnvironment
> *val* training = Seq(
> *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(1.0, 1.0, 1.0))),
> *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 1, 5, 9),
> Array(1.0, 1.0, 1.0, 1.0))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))))
> *val* testing = Seq(
> *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(0.0, 1.0, 1.0))),
> *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))))
> *val* trainingDS = env.fromCollection(training)
> *val* testingDS = env.fromCollection(testing)
> trainingDS.print()
> *val* mlr = MultipleLinearRegression()
> .setIterations(100)
> .setStepsize(2)
> .setConvergenceThreshold(0.001)
> mlr.fit(trainingDS)
> *val* weights = mlr.weightsOption *match* {
> *case* Some(weights) => { weights.collect() }
> *case* None => *throw* *new* Exception("Could not calculate the
> weights.")
> }
> *if* (Logger.isInfoEnabled())
> Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
> testingDS.print()
> *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label
> )))
> *if* (Logger.isInfoEnabled()) {
> Logger.info(predictions.collect().mkString(","))
> }
> }
> @Test
> *def* testMLR_DenseVector() {
> *val* env = ExecutionEnvironment.getExecutionEnvironment
> *val* training = Seq(
> *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0,
> 1.0, 0.0, 0.0, 0.0, 1.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
> *val* testing = Seq(
> *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
> *val* trainingDS = env.fromCollection(training)
> *val* testingDS = env.fromCollection(testing)
> trainingDS.print()
> *val* mlr = MultipleLinearRegression()
> .setIterations(100)
> .setStepsize(2)
> .setConvergenceThreshold(0.001)
> mlr.fit(trainingDS)
> *val* weights = mlr.weightsOption *match* {
> *case* Some(weights) => { weights.collect() }
> *case* None => *throw* *new* Exception("Could not calculate the
> weights.")
> }
> *if* (Logger.isInfoEnabled())
> Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
> testingDS.print()
> *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label
> )))
> *if* (Logger.isInfoEnabled()) {
> Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(","
> )}")
> }
> }
> }
It fails with this error:
java.lang.IllegalArgumentException: axpy only supports adding to a dense
> vector but got type class org.apache.flink.ml.math.SparseVector.
at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(
> GradientDescent.scala:181)
at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(
> GradientDescent.scala:177)
at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
at org.apache.flink.runtime.operators.AllReduceDriver.run(
> AllReduceDriver.java:132)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:144)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(
> IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
If SparseVectors are not supported, when can we expect them to be supported
for MLR?
Thanks in advance for any information that you can provide.
--
*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna
* <http://www.aolplatforms.com>*