http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala deleted file mode 100644 index e49b3a3..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.experimental - -import scala.reflect.ClassTag - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{ParameterMap, WithParameters} - -trait Transformer[Self <: Transformer[Self]] - extends Estimator[Self] - with WithParameters - with Serializable { - that: Self => - - def transform[I, O](input: DataSet[I], transformParameters: ParameterMap = ParameterMap.Empty) - (implicit transformOperation: TransformOperation[Self, I, O]): DataSet[O] = { - transformOperation.transform(that, transformParameters, input) - } - - def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T] = { - ChainedTransformer(this, transformer) - } - - def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P] = { - ChainedPredictor(this, predictor) - } -} - -object Transformer{ - implicit def fallbackChainedTransformOperation[ - L <: Transformer[L], - R <: Transformer[R], - LI, - LO, - RI, - RO] - (implicit transformLeft: TransformOperation[L, LI, LO], - transformRight: TransformOperation[R, RI, RO]) - : TransformOperation[ChainedTransformer[L,R], LI, RO] = { - - new TransformOperation[ChainedTransformer[L, R], LI, RO] { - override def transform( - chain: ChainedTransformer[L, R], - transformParameters: ParameterMap, - input: DataSet[LI]): DataSet[RO] = { - transformLeft.transform(chain.left, transformParameters, input) - transformRight.transform(chain.right, transformParameters, null) - } - } - } - - implicit def fallbackTransformOperation[ - Self: ClassTag, - IN: ClassTag, - OUT: ClassTag] - : TransformOperation[Self, IN, OUT] = { - new TransformOperation[Self, IN, OUT] { - override def transform( - instance: Self, - transformParameters: ParameterMap, - input: DataSet[IN]) - : DataSet[OUT] = { - val self = implicitly[ClassTag[Self]] - val in = implicitly[ClassTag[IN]] - val out = implicitly[ClassTag[OUT]] - - throw new RuntimeException("There is no TransformOperation defined for " + - self.runtimeClass + " which takes a DataSet[" + in.runtimeClass + - "] as input and transforms it into a DataSet[" + out.runtimeClass + "]") - } - } - } -} - -abstract class TransformOperation[Self, IN, OUT] extends Serializable{ - def transform(instance: Self, transformParameters: ParameterMap, input: DataSet[IN]): DataSet[OUT] -}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala deleted file mode 100644 index 61f477c..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.feature - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer, LabeledVector} -import org.apache.flink.ml.feature.PolynomialBase.Degree -import org.apache.flink.ml.math.{DenseVector, Vector} - -import org.apache.flink.api.scala._ - -/** Maps a vector into the polynomial feature space. - * - * This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the - * polynomial feature space of degree `d`. That is to say, it calculates the following - * representation: - * - * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T` - * - * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[LabeledVector]]. - * - * @example - * {{{ - * val trainingDS: DataSet[LabeledVector] = ... - * - * val polyBase = PolynomialBase() - * .setDegree(3) - * - * val mlr = MultipleLinearRegression() - * - * val chained = polyBase.chain(mlr) - * - * val model = chained.fit(trainingDS) - * }}} - * - * =Parameters= - * - * - [[PolynomialBase.Degree]]: Maximum polynomial degree - */ -class PolynomialBase extends Transformer[LabeledVector, LabeledVector] with Serializable { - - def setDegree(degree: Int): PolynomialBase = { - parameters.add(Degree, degree) - this - } - - override def transform(input: DataSet[LabeledVector], parameters: ParameterMap): - DataSet[LabeledVector] = { - val resultingParameters = this.parameters ++ parameters - - val degree = resultingParameters(Degree) - - input.map { - labeledVector => { - val vector = labeledVector.vector - val label = labeledVector.label - - val transformedVector = calculatePolynomial(degree, vector) - - LabeledVector(label, transformedVector) - } - } - } - - private def calculatePolynomial(degree: Int, vector: Vector): Vector = { - new DenseVector(calculateCombinedCombinations(degree, vector).toArray) - } - - /** Calculates for a given vector its representation in the polynomial feature space. - * - * @param degree Maximum degree of polynomial - * @param vector Values of the polynomial variables - * @return List of polynomial values - */ - private def calculateCombinedCombinations(degree: Int, vector: Vector): List[Double] = { - if(degree == 0) { - List() - } else { - val partialResult = calculateCombinedCombinations(degree - 1, vector) - - val combinations = calculateCombinations(vector.size, degree) - - val result = combinations map { - combination => - combination.zipWithIndex.map{ - case (exp, idx) => math.pow(vector(idx), exp) - }.fold(1.0)(_ * _) - } - - result ::: partialResult - } - - } - - /** Calculates all possible combinations of a polynom of degree `value`, whereas the polynom - * can consist of up to `length` factors. The return value is the list of the exponents of the - * individual factors - * - * @param length maximum number of factors - * @param value degree of polynomial - * @return List of lists which contain the exponents of the individual factors - */ - private def calculateCombinations(length: Int, value: Int): List[List[Int]] = { - if(length == 0) { - List() - } else if (length == 1) { - List(List(value)) - } else { - value to 0 by -1 flatMap { - v => - calculateCombinations(length - 1, value - v) map { - v::_ - } - } toList - } - } -} - -object PolynomialBase{ - - case object Degree extends Parameter[Int] { - override val defaultValue: Option[Int] = Some(1) - } - - // ========================= Factory methods ====================================== - - def apply(): PolynomialBase = { - new PolynomialBase() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala index dffb984..fbe35d4 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala @@ -68,13 +68,9 @@ object Breeze { } implicit class Breeze2VectorConverter(vector: BreezeVector[Double]) { - def fromBreeze: Vector = { - vector match { - case dense: BreezeDenseVector[Double] => new DenseVector(dense.data) - - case sparse: BreezeSparseVector[Double] => - new SparseVector(sparse.length, sparse.index, sparse.data) - } + def fromBreeze[T <: Vector: BreezeVectorConverter]: T = { + val converter = implicitly[BreezeVectorConverter[T]] + converter.convert(vector) } } http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala new file mode 100644 index 0000000..687772e --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import breeze.linalg.{SparseVector => BreezeSparseVector} +import breeze.linalg.{DenseVector => BreezeDenseVector} +import breeze.linalg.{Vector => BreezeVector} + +/** Type class which allows the conversion from Breeze vectors to Flink vectors + * + * @tparam T Resulting type of the conversion + */ +trait BreezeVectorConverter[T <: Vector] extends Serializable { + /** Converts a Breeze vector into a Flink vector of type T + * + * @param vector Breeze vector + * @return Flink vector of type T + */ + def convert(vector: BreezeVector[Double]): T +} + +object BreezeVectorConverter{ + + /** Type class implementation for [[org.apache.flink.ml.math.DenseVector]] */ + implicit val denseVectorConverter = new BreezeVectorConverter[DenseVector] { + override def convert(vector: BreezeVector[Double]): DenseVector = { + vector match { + case dense: BreezeDenseVector[Double] => new DenseVector(dense.data) + case sparse: BreezeSparseVector[Double] => new DenseVector(sparse.toDenseVector.data) + } + } + } + + /** Type class implementation for [[org.apache.flink.ml.math.SparseVector]] */ + implicit val sparseVectorConverter = new BreezeVectorConverter[SparseVector] { + override def convert(vector: BreezeVector[Double]): SparseVector = { + vector match { + case dense: BreezeDenseVector[Double] => + SparseVector.fromCOO( + dense.length, + dense.iterator.toIterable) + case sparse: BreezeSparseVector[Double] => + new SparseVector(sparse.length, sparse.index, sparse.data) + } + } + } + + /** Type class implementation for [[Vector]] */ + implicit val vectorConverter = new BreezeVectorConverter[Vector] { + override def convert(vector: BreezeVector[Double]): Vector = { + vector match { + case dense: BreezeDenseVector[Double] => new DenseVector(dense.data) + + case sparse: BreezeSparseVector[Double] => + new SparseVector(sparse.length, sparse.index, sparse.data) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala deleted file mode 100644 index b73b249..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.math - -trait CanCopy[T] extends Serializable { - def copy(value: T): T -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala index fa34ae1..079e4bc 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala @@ -134,8 +134,4 @@ object DenseVector { def init(size: Int, value: Double): DenseVector = { new DenseVector(Array.fill(size)(value)) } - - implicit val canCopy = new CanCopy[DenseVector]{ - override def copy(value: DenseVector): DenseVector = value.copy - } } http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala index 739fb9c..0b1f0cd 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala @@ -72,9 +72,3 @@ trait Vector extends Serializable { } } } - -object Vector{ - implicit val canCopy = new CanCopy[Vector] { - override def copy(value: Vector): Vector = value.copy - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala new file mode 100644 index 0000000..79c7005 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +/** Type class to allow the vector construction from different data types + * + * @tparam T Subtype of [[Vector]] + */ +trait VectorBuilder[T <: Vector] extends Serializable { + /** Builds a [[Vector]] of type T from a List[Double] + * + * @param data Input data where the index denotes the resulting index of the vector + * @return A vector of type T + */ + def build(data: List[Double]): T +} + +object VectorBuilder{ + + /** Type class implementation for [[org.apache.flink.ml.math.DenseVector]] */ + implicit val denseVectorBuilder = new VectorBuilder[DenseVector] { + override def build(data: List[Double]): DenseVector = { + new DenseVector(data.toArray) + } + } + + /** Type class implementation for [[org.apache.flink.ml.math.SparseVector]] */ + implicit val sparseVectorBuilder = new VectorBuilder[SparseVector] { + override def build(data: List[Double]): SparseVector = { + // Enrich elements with explicit indices and filter out zero entries + SparseVector.fromCOO(data.length, (0 until data.length).zip(data).filter(_._2 != 0.0)) + } + } + + /** Type class implementation for [[Vector]] */ + implicit val vectorBuilder = new VectorBuilder[Vector] { + override def build(data: List[Double]): Vector = { + new DenseVector(data.toArray) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala index e0f43d6..4c7f254 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala @@ -107,6 +107,4 @@ package object math { } } - - def copy[T](value: T)(implicit canCopy: CanCopy[T]): T = canCopy.copy(value) } http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala new file mode 100644 index 0000000..85a5b9e --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap + +/** [[Predictor]] which represents a pipeline of possibly multiple [[Transformer]] and a trailing + * [[Predictor]]. + * + * The [[ChainedPredictor]] can be used as a regular [[Predictor]]. Upon calling the fit method, + * the input data is piped through all preceding [[Transformer]] in the pipeline and the resulting + * data is given to the trailing [[Predictor]]. The same holds true for the predict operation. + * + * The pipeline mechanism has been inspired by scikit-learn + * + * @param transformer Preceding [[Transformer]] of the pipeline + * @param predictor Trailing [[Predictor]] of the pipeline + * @tparam T Type of the preceding [[Transformer]] + * @tparam P Type of the trailing [[Predictor]] + */ +case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) + extends Predictor[ChainedPredictor[T, P]]{} + +object ChainedPredictor{ + + /** [[PredictOperation]] for the [[ChainedPredictor]]. + * + * The [[PredictOperation]] requires the [[TransformOperation]] of the preceding [[Transformer]] + * and the [[PredictOperation]] of the trailing [[Predictor]]. Upon calling predict, the testing + * data is first transformed by the preceding [[Transformer]] and the result is then used to + * calculate the prediction via the trailing [[Predictor]]. + * + * @param transformOperation [[TransformOperation]] for the preceding [[Transformer]] + * @param predictOperation [[PredictOperation]] for the trailing [[Predictor]] + * @tparam T Type of the preceding [[Transformer]] + * @tparam P Type of the trailing [[Predictor]] + * @tparam Testing Type of the testing data + * @tparam Intermediate Type of the intermediate data produced by the preceding [[Transformer]] + * @tparam Prediction Type of the predicted data generated by the trailing [[Predictor]] + * @return + */ + implicit def chainedPredictOperation[ + T <: Transformer[T], + P <: Predictor[P], + Testing, + Intermediate, + Prediction]( + implicit transformOperation: TransformOperation[T, Testing, Intermediate], + predictOperation: PredictOperation[P, Intermediate, Prediction]) + : PredictOperation[ChainedPredictor[T, P], Testing, Prediction] = { + + new PredictOperation[ChainedPredictor[T, P], Testing, Prediction] { + override def predict( + instance: ChainedPredictor[T, P], + predictParameters: ParameterMap, + input: DataSet[Testing]) + : DataSet[Prediction] = { + + val testing = instance.transformer.transform(input, predictParameters) + instance.predictor.predict(testing, predictParameters) + } + } + } + + /** [[FitOperation]] for the [[ChainedPredictor]]. + * + * The [[FitOperation]] requires the [[FitOperation]] and the [[TransformOperation]] of the + * preceding [[Transformer]] as well as the [[FitOperation]] of the trailing [[Predictor]]. + * Upon calling fit, the preceding [[Transformer]] is first fitted to the training data. + * The training data is then transformed by the fitted [[Transformer]]. The transformed data + * is then used to fit the [[Predictor]]. + * + * @param fitOperation [[FitOperation]] of the preceding [[Transformer]] + * @param transformOperation [[TransformOperation]] of the preceding [[Transformer]] + * @param predictorFitOperation [[PredictOperation]] of the trailing [[Predictor]] + * @tparam L Type of the preceding [[Transformer]] + * @tparam R Type of the trailing [[Predictor]] + * @tparam I Type of the training data + * @tparam T Type of the intermediate data + * @return + */ + implicit def chainedFitOperation[L <: Transformer[L], R <: Predictor[R], I, T](implicit + fitOperation: FitOperation[L, I], + transformOperation: TransformOperation[L, I, T], + predictorFitOperation: FitOperation[R, T]): FitOperation[ChainedPredictor[L, R], I] = { + new FitOperation[ChainedPredictor[L, R], I] { + override def fit( + instance: ChainedPredictor[L, R], + fitParameters: ParameterMap, + input: DataSet[I]) + : Unit = { + instance.transformer.fit(input, fitParameters) + val intermediateResult = instance.transformer.transform(input, fitParameters) + instance.predictor.fit(intermediateResult, fitParameters) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala new file mode 100644 index 0000000..e443b80 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap + +/** [[Transformer]] which represents the chaining of two [[Transformer]]. + * + * A [[ChainedTransformer]] can be treated as regular [[Transformer]]. Upon calling the fit or + * transform operation, the data is piped through all [[Transformer]] of the pipeline. + * + * The pipeline mechanism has been inspired by scikit-learn + * + * @param left Left [[Transformer]] of the pipeline + * @param right Right [[Transformer]] of the pipeline + * @tparam L Type of the left [[Transformer]] + * @tparam R Type of the right [[Transformer]] + */ +case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L, right: R) + extends Transformer[ChainedTransformer[L, R]] { +} + +object ChainedTransformer{ + + /** [[TransformOperation]] implementation for [[ChainedTransformer]]. + * + * First the transform operation of the left [[Transformer]] is called with the input data. This + * generates intermediate data which is fed to the right [[Transformer]]'s transform operation. + * + * @param transformOpLeft [[TransformOperation]] for the left [[Transformer]] + * @param transformOpRight [[TransformOperation]] for the right [[Transformer]] + * @tparam L Type of the left [[Transformer]] + * @tparam R Type of the right [[Transformer]] + * @tparam I Type of the input data + * @tparam T Type of the intermediate output data + * @tparam O Type of the output data + * @return + */ + implicit def chainedTransformOperation[ + L <: Transformer[L], + R <: Transformer[R], + I, + T, + O](implicit + transformOpLeft: TransformOperation[L, I, T], + transformOpRight: TransformOperation[R, T, O]) + : TransformOperation[ChainedTransformer[L,R], I, O] = { + + new TransformOperation[ChainedTransformer[L, R], I, O] { + override def transform( + chain: ChainedTransformer[L, R], + transformParameters: ParameterMap, + input: DataSet[I]): DataSet[O] = { + val intermediateResult = transformOpLeft.transform(chain.left, transformParameters, input) + transformOpRight.transform(chain.right, transformParameters, intermediateResult) + } + } + } + + /** [[FitOperation]] implementation for [[ChainedTransformer]]. + * + * First the fit operation of the left [[Transformer]] is called with the input data. Then + * the data is transformed by this [[Transformer]] and the given to the fit operation of the + * right [[Transformer]]. + * + * @param leftFitOperation [[FitOperation]] for the left [[Transformer]] + * @param leftTransformOperation [[TransformOperation]] for the left [[Transformer]] + * @param rightFitOperation [[FitOperation]] for the right [[Transformer]] + * @tparam L Type of the left [[Transformer]] + * @tparam R Type of the right [[Transformer]] + * @tparam I Type of the input data + * @tparam T Type of the intermediate output data + * @return + */ + implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit + leftFitOperation: FitOperation[L, I], + leftTransformOperation: TransformOperation[L, I, T], + rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = { + new FitOperation[ChainedTransformer[L, R], I] { + override def fit( + instance: ChainedTransformer[L, R], + fitParameters: ParameterMap, + input: DataSet[I]): Unit = { + instance.left.fit(input, fitParameters) + val intermediateResult = instance.left.transform(input, fitParameters) + instance.right.fit(intermediateResult, fitParameters) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala new file mode 100644 index 0000000..6acac8f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +/** Base trait for Flink's pipeline operators. + * + * An estimator can be fitted to input data. In order to do that the implementing class has + * to provide an implementation of a [[FitOperation]] with the correct input type. In order to make + * the [[FitOperation]] retrievable by the Scala compiler, the implementation should be placed + * in the companion object of the implementing class. + * + * The pipeline mechanism has been inspired by scikit-learn + * + * @tparam Self + */ +trait Estimator[Self] extends WithParameters with Serializable { + that: Self => + + /** Fits the estimator to the given input data. The fitting logic is contained in the + * [[FitOperation]]. The computed state will be stored in the implementing class. + * + * @param training Training data + * @param fitParameters Additional parameters for the [[FitOperation]] + * @param fitOperation [[FitOperation]] which encapsulates the algorithm logic + * @tparam Training Type of the training data + * @return + */ + def fit[Training]( + training: DataSet[Training], + fitParameters: ParameterMap = ParameterMap.Empty)(implicit + fitOperation: FitOperation[Self, Training]): Unit = { + fitOperation.fit(this, fitParameters, training) + } +} + +object Estimator{ + + /** Fallback [[FitOperation]] type class implementation which is used if no other + * [[FitOperation]] with the right input types could be found in the scope of the implementing + * class. The fallback [[FitOperation]] makes the system fail in the pre-flight phase by + * throwing a [[RuntimeException]] which states the reason for the failure. Usually the error + * is a missing [[FitOperation]] implementation for the input types or the wrong chaining + * of pipeline operators which have incompatible input/output types. + * + * @tparam Self Type of the pipeline operator + * @tparam Training Type of training data + * @return + */ + implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag] + : FitOperation[Self, Training] = { + new FitOperation[Self, Training]{ + override def fit( + instance: Self, + fitParameters: ParameterMap, + input: DataSet[Training]) + : Unit = { + + val self = implicitly[ClassTag[Self]] + val training = implicitly[ClassTag[Training]] + + throw new RuntimeException("There is no FitOperation defined for " + self.runtimeClass + + " which trains on a DataSet[" + training.runtimeClass + "]") + } + } + } + + /** Fallback [[FitOperation]] type class implementation for [[ChainedTransformer]]. The fallback + * implementation is used if the Scala compiler could not instantiate the chained fit operation + * defined in the companion object of [[ChainedTransformer]]. This is usually the case if either + * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated for one of the + * leaves of the chained transformer. The fallback [[FitOperation]] calls the first the + * fit operation of the left transformer, then the transform operation of the left transformer + * and last the fit operation of the right transformer. + * + * @param leftFitOperation [[FitOperation]] of the left transformer + * @param leftTransformOperation [[TransformOperation]] of the left transformer + * @param rightFitOperaiton [[FitOperation]] of the right transformer + * @tparam L Type of left transformer + * @tparam R Type of right transformer + * @tparam LI Input type of left transformer's [[FitOperation]] + * @tparam LO Output type of left transformer's [[TransformOperation]] + * @return + */ + implicit def fallbackChainedFitOperationTransformer[ + L <: Transformer[L], + R <: Transformer[R], + LI, + LO](implicit + leftFitOperation: FitOperation[L, LI], + leftTransformOperation: TransformOperation[L, LI, LO], + rightFitOperaiton: FitOperation[R, LO]) + : FitOperation[ChainedTransformer[L, R], LI] = { + new FitOperation[ChainedTransformer[L, R], LI] { + override def fit( + instance: ChainedTransformer[L, R], + fitParameters: ParameterMap, + input: DataSet[LI]): Unit = { + instance.left.fit(input, fitParameters) + val intermediate = instance.left.transform(input, fitParameters) + instance.right.fit(intermediate, fitParameters) + } + } + } + + /** Fallback [[FitOperation]] type class implementation for [[ChainedPredictor]]. The fallback + * implementation is used if the Scala compiler could not instantiate the chained fit operation + * defined in the companion object of [[ChainedPredictor]]. This is usually the case if either + * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated for one of the + * leaves of the chained transformer. The fallback [[FitOperation]] calls the first the + * fit operation of the left transformer, then the transform operation of the left transformer + * and last the fit operation of the right transformer. + * + * @param leftFitOperation [[FitOperation]] of the left transformer + * @param leftTransformOperation [[TransformOperation]] of the left transformer + * @param rightFitOperaiton [[FitOperation]] of the right transformer + * @tparam L Type of left transformer + * @tparam R Type of right transformer + * @tparam LI Input type of left transformer's [[FitOperation]] + * @tparam LO Output type of left transformer's [[TransformOperation]] + * @return + */ + implicit def fallbackChainedFitOperationPredictor[ + L <: Transformer[L], + R <: Predictor[R], + LI, + LO](implicit + leftFitOperation: FitOperation[L, LI], + leftTransformOperation: TransformOperation[L, LI, LO], + rightFitOperaiton: FitOperation[R, LO]) + : FitOperation[ChainedPredictor[L, R], LI] = { + new FitOperation[ChainedPredictor[L, R], LI] { + override def fit( + instance: ChainedPredictor[L, R], + fitParameters: ParameterMap, + input: DataSet[LI]): Unit = { + instance.transformer.fit(input, fitParameters) + val intermediate = instance.transformer.transform(input, fitParameters) + instance.predictor.fit(intermediate, fitParameters) + } + } + } +} + +/** Type class for the fit operation of an [[Estimator]]. + * + * The [[FitOperation]] contains a self type parameter so that the Scala compiler looks into + * the companion object of this class to find implicit values. + * + * @tparam Self Type of the [[Estimator]] subclass for which the [[FitOperation]] is defined + * @tparam Training Type of the training data + */ +trait FitOperation[Self, Training]{ + def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala new file mode 100644 index 0000000..ebfa787 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +/** Predictor trait for Flink's pipeline operators. + * + * A [[Predictor]] calculates predictions for testing data based on the model it learned during + * the fit operation (training phase). In order to do that, the implementing class has to provide + * a [[FitOperation]] and a [[PredictOperation]] implementation for the correct types. The implicit + * values should be put into the scope of the companion object of the implementing class to make + * them retrievable for the Scala compiler. + * + * The pipeline mechanism has been inspired by scikit-learn + * + * @tparam Self Type of the implementing class + */ +trait Predictor[Self] extends Estimator[Self] with WithParameters with Serializable { + that: Self => + + /** Predict testing data according the learned model. The implementing class has to provide + * a corresponding implementation of [[PredictOperation]] which contains the prediction logic. + * + * @param testing Testing data which shall be predicted + * @param predictParameters Additional parameters for the prediction + * @param predictor [[PredictOperation]] which encapsulates the prediction logic + * @tparam Testing Type of the testing data + * @tparam Prediction Type of the prediction data + * @return + */ + def predict[Testing, Prediction]( + testing: DataSet[Testing], + predictParameters: ParameterMap = ParameterMap.Empty)(implicit + predictor: PredictOperation[Self, Testing, Prediction]) + : DataSet[Prediction] = { + predictor.predict(this, predictParameters, testing) + } +} + +object Predictor{ + + /** Fallback [[PredictOperation]] if a [[Predictor]] is called with a not supported input data + * type. The fallback [[PredictOperation]] lets the system fail with a [[RuntimeException]] + * stating which input and output data types were inferred but for which no [[PredictOperation]] + * could be found. + * + * @tparam Self Type of the [[Predictor]] + * @tparam Testing Type of the testing data + * @tparam Prediction Type of the predicted data + * @return + */ + implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag, Prediction: ClassTag] + : PredictOperation[Self, Testing, Prediction] = { + new PredictOperation[Self, Testing, Prediction] { + override def predict( + instance: Self, + predictParameters: ParameterMap, + input: DataSet[Testing]) + : DataSet[Prediction] = { + val self = implicitly[ClassTag[Self]] + val testing = implicitly[ClassTag[Testing]] + val prediction = implicitly[ClassTag[Prediction]] + + throw new RuntimeException("There is no PredictOperation defined for " + self.runtimeClass + + " which takes a DataSet[" + testing.runtimeClass + "] as input and returns a DataSet[" + + prediction.runtimeClass + "]") + } + } + } + + /** Fallback [[PredictOperation]] for a [[ChainedPredictor]] if a [[TransformOperation]] for + * one of the [[Transformer]] and its respective types or the [[PredictOperation]] for the + * [[Predictor]] and its respective type could not be found. This is usually the case, if the + * the pipeline contains pipeline operators which work on incompatible types. + * + * The fallback [[PredictOperation]] first transforms the input data by calling the transform + * method of the [[Transformer]] and then the predict method of the [[Predictor]]. + * + * @param leftTransformOperation [[TransformOperation]] of the [[Transformer]] + * @param rightPredictOperation [[PredictOperation]] of the [[Predictor]] + * @tparam L Type of the [[Transformer]] + * @tparam R Type of the [[Predictor]] + * @tparam LI Input type of the [[Transformer]] + * @tparam LO Output type of the [[Transformer]] + * @tparam RO Prediction type of the [[Predictor]] + * @return + */ + implicit def fallbackChainedPredictOperation[ + L <: Transformer[L], + R <: Predictor[R], + LI, + LO, + RO](implicit + leftTransformOperation: TransformOperation[L, LI, LO], + rightPredictOperation: PredictOperation[R, LO, RO] + ) + : PredictOperation[ChainedPredictor[L, R], LI, RO] = { + new PredictOperation[ChainedPredictor[L, R], LI, RO] { + override def predict( + instance: ChainedPredictor[L, R], + predictParameters: ParameterMap, + input: DataSet[LI]): DataSet[RO] = { + val intermediate = instance.transformer.transform(input, predictParameters) + instance.predictor.predict(intermediate, predictParameters) + } + } + } +} + +/** Type class for the predict operation of [[Predictor]]. + * + * Predictors have to implement this trait and make the result available as an implicit value or + * function in the scope of their companion objects. + * + * The first type parameter is the type of the implementing [[Predictor]] class so that the Scala + * compiler includes the companion object of this class in the search scope for the implicit + * values. + * + * @tparam Self Type of [[Predictor]] implementing class + * @tparam Testing Type of testing data + * @tparam Prediction Type of predicted data + */ +trait PredictOperation[Self, Testing, Prediction]{ + def predict( + instance: Self, + predictParameters: ParameterMap, + input: DataSet[Testing]) + : DataSet[Prediction] +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala new file mode 100644 index 0000000..52e3f7f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +/** Transformer trait for Flink's pipeline operators. + * + * A Transformer transforms a [[DataSet]] of an input type into a [[DataSet]] of an output type. + * Furthermore, a [[Transformer]] is also an [[Estimator]], because some transformations depend + * on the training data. In order to do that the implementing class has to provide a + * [[TransformOperation]] and [[FitOperation]] implementation. The Scala compiler finds these + * implicit values if it is put in the scope of the companion object of the implementing class. + * + * [[Transformer]] can be chained with other [[Transformer]] and [[Predictor]] to create + * pipelines. These pipelines can consist of an arbitrary number of [[Transformer]] and at most + * one trailing [[Predictor]]. + * + * The pipeline mechanism has been inspired by scikit-learn + * + * @tparam Self + */ +trait Transformer[Self <: Transformer[Self]] + extends Estimator[Self] + with WithParameters + with Serializable { + that: Self => + + /** Transform operation which transforms an input [[DataSet]] of type I into an ouptut [[DataSet]] + * of type O. The actual transform operation is implemented within the [[TransformOperation]]. + * + * @param input Input [[DataSet]] of type I + * @param transformParameters Additional parameters for the [[TransformOperation]] + * @param transformOperation [[TransformOperation]] which encapsulates the algorithm's logic + * @tparam Input Input data type + * @tparam Output Ouptut data type + * @return + */ + def transform[Input, Output]( + input: DataSet[Input], + transformParameters: ParameterMap = ParameterMap.Empty) + (implicit transformOperation: TransformOperation[Self, Input, Output]) + : DataSet[Output] = { + transformOperation.transform(that, transformParameters, input) + } + + /** Chains two [[Transformer]] to form a [[ChainedTransformer]]. + * + * @param transformer Right side transformer of the resulting pipeline + * @tparam T Type of the [[Transformer]] + * @return + */ + def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T] = { + ChainedTransformer(this, transformer) + } + + /** Chains a [[Transformer]] with a [[Predictor]] to form a [[ChainedPredictor]]. + * + * @param predictor Trailing [[Predictor]] of the resulting pipeline + * @tparam P Type of the [[Predictor]] + * @return + */ + def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P] = { + ChainedPredictor(this, predictor) + } +} + +object Transformer{ + + /** Fallback [[TransformOperation]] for [[ChainedTransformer]] which is used if no suitable + * [[TransformOperation]] implementation can be found. This implementation is used if there is no + * [[TransformOperation]] for one of the leaves of the [[ChainedTransformer]] for the given + * input types. This is usually the case if one [[Transformer]] does not support the transform + * operation for the input type. + * + * The fallback [[TransformOperation]] for [[ChainedTransformer]] calls first the transform + * operation of the left transformer and then the transform operation of the right transformer. + * That way the fallback [[TransformOperation]] for a [[Transformer]] will be called which + * will fail the job in the pre-flight phase by throwing an exception. + * + * @param transformLeft Left [[Transformer]] of the pipeline + * @param transformRight Right [[Transformer]] of the pipeline + * @tparam L Type of the left [[Transformer]] + * @tparam R Type of the right [[Transformer]] + * @tparam LI Input type of left transformer's [[TransformOperation]] + * @tparam LO Output type of left transformer's [[TransformOperation]] + * @tparam RO Output type of right transformer's [[TransformOperation]] + * @return + */ + implicit def fallbackChainedTransformOperation[ + L <: Transformer[L], + R <: Transformer[R], + LI, + LO, + RO] + (implicit transformLeft: TransformOperation[L, LI, LO], + transformRight: TransformOperation[R, LO, RO]) + : TransformOperation[ChainedTransformer[L,R], LI, RO] = { + + new TransformOperation[ChainedTransformer[L, R], LI, RO] { + override def transform( + chain: ChainedTransformer[L, R], + transformParameters: ParameterMap, + input: DataSet[LI]): DataSet[RO] = { + val intermediate = transformLeft.transform(chain.left, transformParameters, input) + transformRight.transform(chain.right, transformParameters, intermediate) + } + } + } + + /** Fallback [[TransformOperation]] for [[Transformer]] which do not support the input or output + * type with which they are called. This is usualy the case if pipeline operators are chained + * which have incompatible input/output types. In order to detect these failures, the fallback + * [[TransformOperation]] throws a [[RuntimeException]] with the corresponding input/output + * types. Consequently, a wrong pipeline will be detected at pre-flight phase of Flink and + * thus prior to execution time. + * + * @tparam Self Type of the [[Transformer]] for which the [[TransformOperation]] is defined + * @tparam IN Input data type of the [[TransformOperation]] + * @tparam OUT Output data type of the [[TransformOperation]] + * @return + */ + implicit def fallbackTransformOperation[ + Self: ClassTag, + IN: ClassTag, + OUT: ClassTag] + : TransformOperation[Self, IN, OUT] = { + new TransformOperation[Self, IN, OUT] { + override def transform( + instance: Self, + transformParameters: ParameterMap, + input: DataSet[IN]) + : DataSet[OUT] = { + val self = implicitly[ClassTag[Self]] + val in = implicitly[ClassTag[IN]] + val out = implicitly[ClassTag[OUT]] + + throw new RuntimeException("There is no TransformOperation defined for " + + self.runtimeClass + " which takes a DataSet[" + in.runtimeClass + + "] as input and transforms it into a DataSet[" + out.runtimeClass + "]") + } + } + } +} + +/** Type class for a transform operation of [[Transformer]]. + * + * The [[TransformOperation]] contains a self type parameter so that the Scala compiler looks into + * the companion object of this class to find implicit values. + * + * @tparam Self Type of the [[Transformer]] for which the [[TransformOperation]] is defined + * @tparam Input Input data type + * @tparam Output Ouptut data type + */ +abstract class TransformOperation[Self, Input, Output] extends Serializable{ + def transform( + instance: Self, + transformParameters: ParameterMap, + input: DataSet[Input]) + : DataSet[Output] +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala new file mode 100644 index 0000000..8c7daad --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} +import org.apache.flink.ml.math.{Vector, VectorBuilder} +import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer} +import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree + +import scala.reflect.ClassTag + +/** Maps a vector into the polynomial feature space. + * + * This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the + * polynomial feature space of degree `d`. That is to say, it calculates the following + * representation: + * + * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T` + * + * This transformer can be prepended to all [[org.apache.flink.ml.pipeline.Transformer]] and + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect an input of + * [[LabeledVector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[LabeledVector] = ... + * + * val polyFeatures = PolynomialFeatures() + * .setDegree(3) + * + * val mlr = MultipleLinearRegression() + * + * val pipeline = polyFeatures.chainPredictor(mlr) + * + * pipeline.fit(trainingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree]]: Maximum polynomial degree + */ +class PolynomialFeatures extends Transformer[PolynomialFeatures] { + + def setDegree(degree: Int): PolynomialFeatures = { + parameters.add(Degree, degree) + this + } +} + +object PolynomialFeatures{ + + // ====================================== Parameters ============================================= + + case object Degree extends Parameter[Int] { + override val defaultValue: Option[Int] = Some(1) + } + + // =================================== Factory methods =========================================== + + def apply(): PolynomialFeatures = { + new PolynomialFeatures() + } + + // ====================================== Operations ============================================= + + /** The [[PolynomialFeatures]] transformer does not need a fitting phase. + * + * @tparam T The fitting works with arbitrary input types + * @return + */ + implicit def fitNoOp[T] = { + new FitOperation[PolynomialFeatures, T]{ + override def fit( + instance: PolynomialFeatures, + fitParameters: ParameterMap, + input: DataSet[T]) + : Unit = {} + } + } + + /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a [[Vector]] into the polynomial + * feature space. + * + * @tparam T Subclass of [[Vector]] + * @return + */ + implicit def transformVectorIntoPolynomialBase[ + T <: Vector : VectorBuilder: TypeInformation: ClassTag + ] = { + new TransformOperation[PolynomialFeatures, T, T] { + override def transform( + instance: PolynomialFeatures, + transformParameters: ParameterMap, + input: DataSet[T]) + : DataSet[T] = { + val resultingParameters = instance.parameters ++ transformParameters + + val degree = resultingParameters(Degree) + + input.map { + vector => { + calculatePolynomial(degree, vector) + } + } + } + } + } + + /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a [[LabeledVector]] into the + * polynomial feature space + */ + implicit val transformLabeledVectorIntoPolynomialBase = + new TransformOperation[PolynomialFeatures, LabeledVector, LabeledVector] { + + override def transform( + instance: PolynomialFeatures, + transformParameters: ParameterMap, + input: DataSet[LabeledVector]) + : DataSet[LabeledVector] = { + val resultingParameters = instance.parameters ++ transformParameters + + val degree = resultingParameters(Degree) + + input.map { + labeledVector => { + val vector = labeledVector.vector + val label = labeledVector.label + + val transformedVector = calculatePolynomial(degree, vector) + + LabeledVector(label, transformedVector) + } + } + } + } + + + private def calculatePolynomial[T <: Vector: VectorBuilder](degree: Int, vector: T): T = { + val builder = implicitly[VectorBuilder[T]] + builder.build(calculateCombinedCombinations(degree, vector)) + } + + /** Calculates for a given vector its representation in the polynomial feature space. + * + * @param degree Maximum degree of polynomial + * @param vector Values of the polynomial variables + * @return List of polynomial values + */ + private def calculateCombinedCombinations(degree: Int, vector: Vector): List[Double] = { + if(degree == 0) { + List() + } else { + val partialResult = calculateCombinedCombinations(degree - 1, vector) + + val combinations = calculateCombinations(vector.size, degree) + + val result = combinations map { + combination => + combination.zipWithIndex.map{ + case (exp, idx) => math.pow(vector(idx), exp) + }.fold(1.0)(_ * _) + } + + result ::: partialResult + } + + } + + /** Calculates all possible combinations of a polynom of degree `value`, whereas the polynom + * can consist of up to `length` factors. The return value is the list of the exponents of the + * individual factors + * + * @param length maximum number of factors + * @param value degree of polynomial + * @return List of lists which contain the exponents of the individual factors + */ + private def calculateCombinations(length: Int, value: Int): List[List[Int]] = { + if(length == 0) { + List() + } else if (length == 1) { + List(List(value)) + } else { + value to 0 by -1 flatMap { + v => + calculateCombinations(length - 1, value - v) map { + v::_ + } + } toList + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index 9224e7c..bd952c3 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -22,38 +22,47 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer} import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import scala.reflect.ClassTag + /** Scales observations, so that all features have a user-specified mean and standard deviation. * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. * - * This transformer takes a [[Vector]] of values and maps it to a - * scaled [[Vector]] such that each feature has a user-specified mean and standard deviation. + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard + * deviation. * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[Vector]]. + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. * * @example * {{{ * val trainingDS: DataSet[Vector] = env.fromCollection(data) * val transformer = StandardScaler().setMean(10.0).setStd(2.0) * - * transformer.transform(trainingDS) + * transformer.fit(trainingDS) + * val transformedDS = transformer.transform(trainingDS) * }}} * * =Parameters= * - * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * - [[Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[Std]]: The standard deviation of the transformed data set; by default * equal to 1 */ -class StandardScaler extends Transformer[Vector, Vector] with Serializable { +class StandardScaler extends Transformer[StandardScaler] { + + + var metricsOption: Option[DataSet[(linalg.Vector[Double], linalg.Vector[Double])]] = None /** Sets the target mean of the transformed data * @@ -78,36 +87,62 @@ class StandardScaler extends Transformer[Vector, Vector] with Serializable { parameters.add(Std, std) this } +} - override def transform(input: DataSet[Vector], parameters: ParameterMap): - DataSet[Vector] = { - val resultingParameters = this.parameters ++ parameters - val mean = resultingParameters(Mean) - val std = resultingParameters(Std) +object StandardScaler { + + // ====================================== Parameters ============================================= - val featureMetrics = extractFeatureMetrics(input) + case object Mean extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(0.0) + } - input.map(new RichMapFunction[Vector, Vector]() { + case object Std extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(1.0) + } - var broadcastMean: linalg.Vector[Double] = null - var broadcastStd: linalg.Vector[Double] = null + // ==================================== Factory methods ========================================== - override def open(parameters: Configuration): Unit = { - val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double], - linalg.Vector[Double])]("broadcastedMetrics").get(0) - broadcastMean = broadcastedMetrics._1 - broadcastStd = broadcastedMetrics._2 - } + def apply(): StandardScaler = { + new StandardScaler() + } + + // ====================================== Operations ============================================= + + /** Trains the [[org.apache.flink.ml.preprocessing.StandardScaler]] by learning the mean and + * standard deviation of the training data. These values are used inthe transform step + * to transform the given input data. + * + * @tparam T Input data type which is a subtype of [[Vector]] + * @return + */ + implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] { + override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T]) + : Unit = { + val metrics = extractFeatureMetrics(input) + + instance.metricsOption = Some(metrics) + } + } - override def map(vector: Vector): Vector = { - var myVector = vector.asBreeze + /** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training + * data which is of type [[LabeledVector]]. The mean and standard deviation are used to + * transform the given input data. + * + */ + implicit val fitLabeledVectorStandardScaler = { + new FitOperation[StandardScaler, LabeledVector] { + override def fit( + instance: StandardScaler, + fitParameters: ParameterMap, + input: DataSet[LabeledVector]) + : Unit = { + val vectorDS = input.map(_.vector) + val metrics = extractFeatureMetrics(vectorDS) - myVector -= broadcastMean - myVector :/= broadcastStd - myVector = (myVector :* std) + mean - return myVector.fromBreeze + instance.metricsOption = Some(metrics) } - }).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } } /** Calculates in one pass over the data the features' mean and standard deviation. @@ -121,7 +156,7 @@ class StandardScaler extends Transformer[Vector, Vector] with Serializable { * The first vector represents the mean vector and the second is the standard * deviation vector. */ - private def extractFeatureMetrics(dataSet: DataSet[Vector]) + private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T]) : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { val metrics = dataSet.map{ v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) @@ -154,19 +189,100 @@ class StandardScaler extends Transformer[Vector, Vector] with Serializable { } metrics } -} -object StandardScaler { + /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to + * the calculated mean and standard deviation of the training data. The mean and standard + * deviation of the resulting data is configurable. + * + * @tparam T Type of the input and output data which has to be a subtype of [[Vector]] + * @return + */ + implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { + new TransformOperation[StandardScaler, T, T] { + override def transform( + instance: StandardScaler, + transformParameters: ParameterMap, + input: DataSet[T]) + : DataSet[T] = { - case object Mean extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(0.0) - } + val resultingParameters = instance.parameters ++ transformParameters + val mean = resultingParameters(Mean) + val std = resultingParameters(Std) - case object Std extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(1.0) + instance.metricsOption match { + case Some(metrics) => { + input.map(new RichMapFunction[T, T]() { + + var broadcastMean: linalg.Vector[Double] = null + var broadcastStd: linalg.Vector[Double] = null + + override def open(parameters: Configuration): Unit = { + val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[ + (linalg.Vector[Double], linalg.Vector[Double]) + ]("broadcastedMetrics").get(0) + broadcastMean = broadcastedMetrics._1 + broadcastStd = broadcastedMetrics._2 + } + + override def map(vector: T): T = { + var myVector = vector.asBreeze + + myVector -= broadcastMean + myVector :/= broadcastStd + myVector = (myVector :* std) + mean + myVector.fromBreeze + } + }).withBroadcastSet(metrics, "broadcastedMetrics") + } + + case None => + throw new RuntimeException("The StandardScaler has not been fitted to the data. " + + "This is necessary to estimate the mean and standard deviation of the data.") + } + } + } } - def apply(): StandardScaler = { - new StandardScaler() + implicit val transformLabeledVectors = { + new TransformOperation[StandardScaler, LabeledVector, LabeledVector] { + override def transform(instance: StandardScaler, transformParameters: ParameterMap, input: + DataSet[LabeledVector]): DataSet[LabeledVector] = { + val resultingParameters = instance.parameters ++ transformParameters + val mean = resultingParameters(Mean) + val std = resultingParameters(Std) + + instance.metricsOption match { + case Some(metrics) => { + input.map(new RichMapFunction[LabeledVector, LabeledVector]() { + + var broadcastMean: linalg.Vector[Double] = null + var broadcastStd: linalg.Vector[Double] = null + + override def open(parameters: Configuration): Unit = { + val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[ + (linalg.Vector[Double], linalg.Vector[Double]) + ]("broadcastedMetrics").get(0) + broadcastMean = broadcastedMetrics._1 + broadcastStd = broadcastedMetrics._2 + } + + override def map(labeledVector: LabeledVector): LabeledVector = { + val LabeledVector(label, vector) = labeledVector + var breezeVector = vector.asBreeze + + breezeVector -= broadcastMean + breezeVector :/= broadcastStd + breezeVector = (breezeVector :* std) + mean + LabeledVector(label, breezeVector.fromBreeze[Vector]) + } + }).withBroadcastSet(metrics, "broadcastedMetrics") + } + + case None => + throw new RuntimeException("The StandardScaler has not been fitted to the data. " + + "This is necessary to estimate the mean and standard deviation of the data.") + } + } + } } }