http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index 5c6de55..d8efdaf 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.common.operators.Order import org.apache.flink.core.memory.{DataOutputView, DataInputView} import org.apache.flink.ml.common._ -import org.apache.flink.ml.recommendation.ALS.Factors +import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor} import org.apache.flink.types.Value import org.apache.flink.util.Collector import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction} @@ -73,55 +73,58 @@ import scala.util.Random * .setIterations(10) * .setNumFactors(10) * - * val model = als.fit(inputDS) + * als.fit(inputDS) * * val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData) * - * model.transform(data2Predict) + * als.predict(data2Predict) * }}} * * =Parameters= * - * - [[ALS.NumFactors]]: + * - [[org.apache.flink.ml.recommendation.ALS.NumFactors]]: * The number of latent factors. It is the dimension of the calculated user and item vectors. * (Default value: '''10''') * - * - [[ALS.Lambda]]: + * - [[org.apache.flink.ml.recommendation.ALS.Lambda]]: * Regularization factor. Tune this value in order to avoid overfitting/generalization. * (Default value: '''1''') * - * - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''') + * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]: + * The number of iterations to perform. (Default value: '''10''') * - * - [[ALS.Blocks]]: + * - [[org.apache.flink.ml.recommendation.ALS.Blocks]]: * The number of blocks into which the user and item matrix a grouped. The fewer * blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger * update messages which have to be stored on the Heap. If the algorithm fails because of * an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''') * - * - [[ALS.Seed]]: + * - [[org.apache.flink.ml.recommendation.ALS.Seed]]: * Random seed used to generate the initial item matrix for the algorithm. * (Default value: '''0''') * - * - [[ALS.TemporaryPath]]: + * - [[org.apache.flink.ml.recommendation.ALS.TemporaryPath]]: * Path to a temporary directory into which intermediate results are stored. If * this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration * and a post-processing step which calculates a last ALS half-step. The preprocessing steps - * calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and [[org.apache - * .flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix. The result of - * the individual steps are stored in the specified directory. By splitting the algorithm - * into multiple smaller steps, Flink does not have to split the available memory amongst too many - * operators. This allows the system to process bigger individual messasges and improves the - * overall performance. (Default value: '''None''') + * calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and + * [[org.apache.flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix. + * The result of the individual steps are stored in the specified directory. By splitting the + * algorithm into multiple smaller steps, Flink does not have to split the available memory + * amongst too many operators. This allows the system to process bigger individual messasges and + * improves the overall performance. (Default value: '''None''') * * The ALS implementation is based on Spark's MLLib implementation of ALS which you can find * [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/ * recommendation/ALS.scala here]]. */ -class -ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { +class ALS extends Predictor[ALS] { import ALS._ + // Stores the matrix factorization after the fitting phase + var factorsOption: Option[(DataSet[Factors], DataSet[Factors])] = None + /** Sets the number of latent factors/row dimension of the latent model * * @param numFactors @@ -183,91 +186,334 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { this } - /** Calculates the matrix factorization for the given ratings. A rating is defined as - * a tuple of user ID, item ID and the corresponding rating. + /** Empirical risk of the trained model (matrix factorization). * - * @param input Set of user/item ratings for which the factorization has to be calculated - * @return Factorization containing the user and item matrix + * @param labeledData Reference data + * @param riskParameters Additional parameters for the empirical risk calculation + * @return */ - def fit(input: DataSet[(Int, Int, Double)], fitParameters: ParameterMap): ALSModel = { - val resultParameters = this.parameters ++ fitParameters - - val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt) - val itemBlocks = userBlocks - val persistencePath = resultParameters.get(TemporaryPath) - val seed = resultParameters(Seed) - val factors = resultParameters(NumFactors) - val iterations = resultParameters(Iterations) - val lambda = resultParameters(Lambda) - - val ratings = input.map { - entry => { - val (userID, itemID, rating) = entry - Rating(userID, itemID, rating) + def empiricalRisk( + labeledData: DataSet[(Int, Int, Double)], + riskParameters: ParameterMap = ParameterMap.Empty) + : DataSet[Double] = { + val resultingParameters = parameters ++ riskParameters + + val lambda = resultingParameters(Lambda) + + val data = labeledData map { + x => (x._1, x._2) + } + + factorsOption match { + case Some((userFactors, itemFactors)) => { + val predictions = data.join(userFactors).where(0).equalTo(0) + .join(itemFactors).where("_1._2").equalTo(0).map { + triple => { + val (((uID, iID), uFactors), iFactors) = triple + + val uFactorsVector = uFactors.factors + val iFactorsVector = iFactors.factors + + val squaredUNorm2 = blas.ddot( + uFactorsVector.length, + uFactorsVector, + 1, + uFactorsVector, + 1) + val squaredINorm2 = blas.ddot( + iFactorsVector.length, + iFactorsVector, + 1, + iFactorsVector, + 1) + + val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1) + + (uID, iID, prediction, squaredUNorm2, squaredINorm2) + } + } + + labeledData.join(predictions).where(0,1).equalTo(0,1) { + (left, right) => { + val (_, _, expected) = left + val (_, _, predicted, squaredUNorm2, squaredINorm2) = right + + val residual = expected - predicted + + residual * residual + lambda * (squaredUNorm2 + squaredINorm2) + } + } reduce { + _ + _ + } } + + case None => throw new RuntimeException("The ALS model has not been fitted to data. " + + "Prior to predicting values, it has to be trained on data.") } + } +} - val blockIDPartitioner = new BlockIDPartitioner() +object ALS { + val USER_FACTORS_FILE = "userFactorsFile" + val ITEM_FACTORS_FILE = "itemFactorsFile" - val ratingsByUserBlock = ratings.map{ - rating => - val blockID = rating.user % userBlocks - (blockID, rating) - } partitionCustom(blockIDPartitioner, 0) + // ========================================= Parameters ========================================== - val ratingsByItemBlock = ratings map { - rating => - val blockID = rating.item % itemBlocks - (blockID, new Rating(rating.item, rating.user, rating.rating)) - } partitionCustom(blockIDPartitioner, 0) + case object NumFactors extends Parameter[Int] { + val defaultValue: Option[Int] = Some(10) + } - val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock, - blockIDPartitioner) - val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock, - blockIDPartitioner) + case object Lambda extends Parameter[Double] { + val defaultValue: Option[Double] = Some(1.0) + } + + case object Iterations extends Parameter[Int] { + val defaultValue: Option[Int] = Some(10) + } + + case object Blocks extends Parameter[Int] { + val defaultValue: Option[Int] = None + } + + case object Seed extends Parameter[Long] { + val defaultValue: Option[Long] = Some(0L) + } - val (userIn, userOut) = persistencePath match { - case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut") - case None => (uIn, uOut) + case object TemporaryPath extends Parameter[String] { + val defaultValue: Option[String] = None + } + + // ==================================== ALS type definitions ===================================== + + /** Representation of a user-item rating + * + * @param user User ID of the rating user + * @param item Item iD of the rated item + * @param rating Rating value + */ + case class Rating(user: Int, item: Int, rating: Double) + + /** Latent factor model vector + * + * @param id + * @param factors + */ + case class Factors(id: Int, factors: Array[Double]) { + override def toString = s"($id, ${factors.mkString(",")})" + } + + case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors]) + + case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) { + override def toString: String = { + s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))" } + } + + class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value { + def this() = this(null) - val (itemIn, itemOut) = persistencePath match { - case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut") - case None => (iIn, iOut) + override def toString: String = { + s"${links.mkString("\n")}" } - val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{ - outInfos => - val blockID = outInfos._1 - val infos = outInfos._2 + override def write(out: DataOutputView): Unit = { + out.writeInt(links.length) + links foreach { + link => { + val bitMask = link.toBitMask + out.writeInt(bitMask.length) + for (element <- bitMask) { + out.writeLong(element) + } + } + } + } - (blockID, infos.elementIDs.map{ - id => - val random = new Random(id ^ seed) - randomFactors(factors, random) - }) - }.withForwardedFields("0") + override def read(in: DataInputView): Unit = { + val length = in.readInt() + links = new Array[scala.collection.mutable.BitSet](length) - // iteration to calculate the item matrix - val items = initialItems.iterate(iterations) { - items => { - val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda, - blockIDPartitioner) - updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner) + for (i <- 0 until length) { + val bitMaskLength = in.readInt() + val bitMask = new Array[Long](bitMaskLength) + for (j <- 0 until bitMaskLength) { + bitMask(j) = in.readLong() + } + links(i) = mutable.BitSet.fromBitMask(bitMask) } } - val pItems = persistencePath match { - case Some(path) => FlinkTools.persist(items, path + "items") - case None => items + def apply(idx: Int) = links(idx) + } + + case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) { + + override def toString: String = { + s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))" + } + } + + case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) { + def apply(idx: Int) = ratings(idx) + + override def toString: String = { + ratings.map { + case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))" + }.mkString(",") + } + } + + case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])], + itemFactors: DataSet[(Int, Array[Array[Double]])]) + + class BlockIDPartitioner extends FlinkPartitioner[Int] { + override def partition(blockID: Int, numberOfPartitions: Int): Int = { + blockID % numberOfPartitions } + } + + class BlockIDGenerator(blocks: Int) extends Serializable { + def apply(id: Int): Int = { + id % blocks + } + } + + // ================================= Factory methods ============================================= + + def apply(): ALS = { + new ALS() + } + + // ===================================== Operations ============================================== + + /** Predict operation which calculates the matrix entry for the given indices */ + implicit val predictRating = new PredictOperation[ALS, (Int, Int), (Int ,Int, Double)] { + override def predict( + instance: ALS, + predictParameters: ParameterMap, + input: DataSet[(Int, Int)]) + : DataSet[(Int, Int, Double)] = { + + instance.factorsOption match { + case Some((userFactors, itemFactors)) => { + input.join(userFactors).where(0).equalTo(0) + .join(itemFactors).where("_1._2").equalTo(0).map { + triple => { + val (((uID, iID), uFactors), iFactors) = triple + + val uFactorsVector = uFactors.factors + val iFactorsVector = iFactors.factors + + val prediction = blas.ddot( + uFactorsVector.length, + uFactorsVector, + 1, + iFactorsVector, + 1) + + (uID, iID, prediction) + } + } + } + + case None => throw new RuntimeException("The ALS model has not been fitted to data. " + + "Prior to predicting values, it has to be trained on data.") + } + } + } + + /** Calculates the matrix factorization for the given ratings. A rating is defined as + * a tuple of user ID, item ID and the corresponding rating. + * + * @return Factorization containing the user and item matrix + */ + implicit val fitALS = new FitOperation[ALS, (Int, Int, Double)] { + override def fit( + instance: ALS, + fitParameters: ParameterMap, + input: DataSet[(Int, Int, Double)]) + : Unit = { + val resultParameters = instance.parameters ++ fitParameters + + val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt) + val itemBlocks = userBlocks + val persistencePath = resultParameters.get(TemporaryPath) + val seed = resultParameters(Seed) + val factors = resultParameters(NumFactors) + val iterations = resultParameters(Iterations) + val lambda = resultParameters(Lambda) + + val ratings = input.map { + entry => { + val (userID, itemID, rating) = entry + Rating(userID, itemID, rating) + } + } + + val blockIDPartitioner = new BlockIDPartitioner() + + val ratingsByUserBlock = ratings.map{ + rating => + val blockID = rating.user % userBlocks + (blockID, rating) + } partitionCustom(blockIDPartitioner, 0) + + val ratingsByItemBlock = ratings map { + rating => + val blockID = rating.item % itemBlocks + (blockID, new Rating(rating.item, rating.user, rating.rating)) + } partitionCustom(blockIDPartitioner, 0) + + val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock, + blockIDPartitioner) + val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock, + blockIDPartitioner) + + val (userIn, userOut) = persistencePath match { + case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut") + case None => (uIn, uOut) + } + + val (itemIn, itemOut) = persistencePath match { + case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut") + case None => (iIn, iOut) + } - // perform last half-step to calculate the user matrix - val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda, - blockIDPartitioner) + val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{ + outInfos => + val blockID = outInfos._1 + val infos = outInfos._2 + + (blockID, infos.elementIDs.map{ + id => + val random = new Random(id ^ seed) + randomFactors(factors, random) + }) + }.withForwardedFields("0") + + // iteration to calculate the item matrix + val items = initialItems.iterate(iterations) { + items => { + val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda, + blockIDPartitioner) + updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner) + } + } + + val pItems = persistencePath match { + case Some(path) => FlinkTools.persist(items, path + "items") + case None => items + } + + // perform last half-step to calculate the user matrix + val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda, + blockIDPartitioner) - new ALSModel(unblock(users, userOut, blockIDPartitioner), unblock(pItems, itemOut, - blockIDPartitioner), lambda) + instance.factorsOption = Some(( + unblock(users, userOut, blockIDPartitioner), + unblock(pItems, itemOut, blockIDPartitioner))) + } } /** Calculates a single half step of the ALS optimization. The result is the new value for @@ -283,11 +529,11 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { * @return New value for the optimized matrix (either user or item) */ def updateFactors(numUserBlocks: Int, - items: DataSet[(Int, Array[Array[Double]])], - itemOut: DataSet[(Int, OutBlockInformation)], - userIn: DataSet[(Int, InBlockInformation)], - factors: Int, - lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]): + items: DataSet[(Int, Array[Array[Double]])], + itemOut: DataSet[(Int, OutBlockInformation)], + userIn: DataSet[(Int, InBlockInformation)], + factors: Int, + lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]): DataSet[(Int, Array[Array[Double]])] = { // send the item vectors to the blocks whose users have rated the items val partialBlockMsgs = itemOut.join(items).where(0).equalTo(0). @@ -334,8 +580,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { val numRatings = new ArrayBuffer[Int]() override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])], - right: lang.Iterable[(Int, InBlockInformation)], - collector: Collector[(Int, Array[Array[Double]])]): Unit = { + right: lang.Iterable[(Int, InBlockInformation)], + collector: Collector[(Int, Array[Array[Double]])]): Unit = { // there is only one InBlockInformation per user block val inInfo = right.iterator().next()._2 val updates = left.iterator() @@ -439,7 +685,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { * @return */ def createBlockInformation(userBlocks: Int, itemBlocks: Int, ratings: DataSet[(Int, Rating)], - blockIDPartitioner: BlockIDPartitioner): + blockIDPartitioner: BlockIDPartitioner): (DataSet[(Int, InBlockInformation)], DataSet[(Int, OutBlockInformation)]) = { val blockIDGenerator = new BlockIDGenerator(itemBlocks) @@ -498,8 +744,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { * @return */ def createOutBlockInformation(ratings: DataSet[(Int, Rating)], - usersPerBlock: DataSet[(Int, Array[Int])], - itemBlocks: Int, blockIDGenerator: BlockIDGenerator): + usersPerBlock: DataSet[(Int, Array[Int])], + itemBlocks: Int, blockIDGenerator: BlockIDGenerator): DataSet[(Int, OutBlockInformation)] = { ratings.coGroup(usersPerBlock).where(0).equalTo(0).apply { (ratings, users) => @@ -547,8 +793,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { * @return */ def createInBlockInformation(ratings: DataSet[(Int, Rating)], - usersPerBlock: DataSet[(Int, Array[Int])], - blockIDGenerator: BlockIDGenerator): + usersPerBlock: DataSet[(Int, Array[Int])], + blockIDGenerator: BlockIDGenerator): DataSet[(Int, InBlockInformation)] = { // Group for every user block the users which have rated the same item and collect their ratings val partialInInfos = ratings.map { x => (x._1, x._2.item, x._2.user, x._2.rating)} @@ -628,9 +874,9 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { val buffer = ArrayBuffer[BlockRating]() override def coGroup(partialInfosIterable: - lang.Iterable[(Int, Int, Array[(Array[Int], Array[Double])])], - userIterable: lang.Iterable[(Int, Array[Int])], - collector: Collector[(Int, InBlockInformation)]): Unit = { + lang.Iterable[(Int, Int, Array[(Array[Int], Array[Double])])], + userIterable: lang.Iterable[(Int, Array[Int])], + collector: Collector[(Int, InBlockInformation)]): Unit = { val users = userIterable.iterator() val partialInfos = partialInfosIterable.iterator() @@ -691,8 +937,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { * @return */ def unblock(users: DataSet[(Int, Array[Array[Double]])], - outInfo: DataSet[(Int, OutBlockInformation)], - blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = { + outInfo: DataSet[(Int, OutBlockInformation)], + blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = { users.join(outInfo).where(0).equalTo(0).withPartitioner(blockIDPartitioner).apply { (left, right, col: Collector[Factors]) => { val outInfo = right._2 @@ -725,7 +971,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { } def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double], - factors: Int): Unit = { + factors: Int): Unit = { var row = 0 var pos = 0 @@ -759,206 +1005,3 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { Array.fill(factors)(random.nextDouble()) } } - -object ALS { - val USER_FACTORS_FILE = "userFactorsFile" - val ITEM_FACTORS_FILE = "itemFactorsFile" - - case object NumFactors extends Parameter[Int] { - val defaultValue: Option[Int] = Some(10) - } - - case object Lambda extends Parameter[Double] { - val defaultValue: Option[Double] = Some(1.0) - } - - case object Iterations extends Parameter[Int] { - val defaultValue: Option[Int] = Some(10) - } - - case object Blocks extends Parameter[Int] { - val defaultValue: Option[Int] = None - } - - case object Seed extends Parameter[Long] { - val defaultValue: Option[Long] = Some(0L) - } - - case object TemporaryPath extends Parameter[String] { - val defaultValue: Option[String] = None - } - - // ==================================== ALS type definitions ===================================== - - /** Representation of a user-item rating - * - * @param user User ID of the rating user - * @param item Item iD of the rated item - * @param rating Rating value - */ - case class Rating(user: Int, item: Int, rating: Double) - - /** Latent factor model vector - * - * @param id - * @param factors - */ - case class Factors(id: Int, factors: Array[Double]) { - override def toString = s"($id, ${factors.mkString(",")})" - } - - case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors]) - - case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) { - override def toString: String = { - s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))" - } - } - - class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value { - def this() = this(null) - - override def toString: String = { - s"${links.mkString("\n")}" - } - - override def write(out: DataOutputView): Unit = { - out.writeInt(links.length) - links foreach { - link => { - val bitMask = link.toBitMask - out.writeInt(bitMask.length) - for (element <- bitMask) { - out.writeLong(element) - } - } - } - } - - override def read(in: DataInputView): Unit = { - val length = in.readInt() - links = new Array[scala.collection.mutable.BitSet](length) - - for (i <- 0 until length) { - val bitMaskLength = in.readInt() - val bitMask = new Array[Long](bitMaskLength) - for (j <- 0 until bitMaskLength) { - bitMask(j) = in.readLong() - } - links(i) = mutable.BitSet.fromBitMask(bitMask) - } - } - - def apply(idx: Int) = links(idx) - } - - case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) { - - override def toString: String = { - s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))" - } - } - - case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) { - def apply(idx: Int) = ratings(idx) - - override def toString: String = { - ratings.map { - case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))" - }.mkString(",") - } - } - - case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])], - itemFactors: DataSet[(Int, Array[Array[Double]])]) - - class BlockIDPartitioner extends FlinkPartitioner[Int] { - override def partition(blockID: Int, numberOfPartitions: Int): Int = { - blockID % numberOfPartitions - } - } - - class BlockIDGenerator(blocks: Int) extends Serializable { - def apply(id: Int): Int = { - id % blocks - } - } - - // ========================= Factory methods ===================================== - - def apply(): ALS = { - new ALS() - } -} - -/** Resulting model of the ALS algorithm. - * - * It contains the calculated factors, user and item matrix, of the given - * ratings matrix. Additionally it stores the used regularization value lambda in order to - * calculate the empirical risk of the model. - * - * @param userFactors Calculated user matrix - * @param itemFactors Calcualted item matrix - * @param lambda Regularization value used to calculate the model - */ -class ALSModel( - @transient val userFactors: DataSet[Factors], - @transient val itemFactors: DataSet[Factors], - val lambda: Double) - extends Transformer[(Int, Int), (Int, Int, Double)] - with Serializable { - - override def transform(input: DataSet[(Int, Int)], parameters: ParameterMap): DataSet[(Int, - Int, Double)] = { - - input.join(userFactors).where(0).equalTo(0) - .join(itemFactors).where("_1._2").equalTo(0).map { - triple => { - val (((uID, iID), uFactors), iFactors) = triple - - val uFactorsVector = uFactors.factors - val iFactorsVector = iFactors.factors - - val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1) - - (uID, iID, prediction) - } - } - } - - def empiricalRisk(labeledData: DataSet[(Int, Int, Double)]): DataSet[Double] = { - val data = labeledData map { - x => (x._1, x._2) - } - - val predictions = data.join(userFactors).where(0).equalTo(0) - .join(itemFactors).where("_1._2").equalTo(0).map { - triple => { - val (((uID, iID), uFactors), iFactors) = triple - - val uFactorsVector = uFactors.factors - val iFactorsVector = iFactors.factors - - val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1) - val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1) - - val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1) - - (uID, iID, prediction, squaredUNorm2, squaredINorm2) - } - } - - labeledData.join(predictions).where(0,1).equalTo(0,1) { - (left, right) => { - val (_, _, expected) = left - val (_, _, predicted, squaredUNorm2, squaredINorm2) = right - - val residual = expected - predicted - - residual * residual + lambda * (squaredUNorm2 + squaredINorm2) - } - } reduce { - _ + _ - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index 87352fa..64b24dc 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -29,6 +29,7 @@ import org.apache.flink.ml.math.vector2Array import org.apache.flink.api.scala._ import com.github.fommil.netlib.BLAS.{ getInstance => blas } +import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor} /** Multiple linear regression using the ordinary least squares (OLS) estimator. * @@ -67,30 +68,33 @@ import com.github.fommil.netlib.BLAS.{ getInstance => blas } * val trainingDS: DataSet[LabeledVector] = ... * val testingDS: DataSet[Vector] = ... * - * val model = mlr.fit(trainingDS) + * mlr.fit(trainingDS) * - * val predictions = model.transform(testingDS) + * val predictions = mlr.predict(testingDS) * }}} * * =Parameters= * - * - [[MultipleLinearRegression.Iterations]]: Maximum number of iterations. + * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]: + * Maximum number of iterations. * - * - [[MultipleLinearRegression.Stepsize]]: + * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Stepsize]]: * Initial step size for the gradient descent method. * This value controls how far the gradient descent method moves in the opposite direction of the * gradient. Tuning this parameter might be crucial to make it stable and to obtain a better * performance. * - * - [[MultipleLinearRegression.ConvergenceThreshold]]: + * - [[org.apache.flink.ml.regression.MultipleLinearRegression.ConvergenceThreshold]]: * Threshold for relative change of sum of squared residuals until convergence. * */ -class -MultipleLinearRegression extends Learner[LabeledVector, MultipleLinearRegressionModel] -with Serializable { +class MultipleLinearRegression extends Predictor[MultipleLinearRegression] { + import MultipleLinearRegression._ + // Stores the weights of the linear model after the fitting phase + var weightsOption: Option[DataSet[(Array[Double], Double)]] = None + def setIterations(iterations: Int): MultipleLinearRegression = { parameters.add(Iterations, iterations) this @@ -106,116 +110,174 @@ with Serializable { this } - override def fit(input: DataSet[LabeledVector], fitParameters: ParameterMap): - MultipleLinearRegressionModel = { - val map = this.parameters ++ fitParameters - - // retrieve parameters of the algorithm - val numberOfIterations = map(Iterations) - val stepsize = map(Stepsize) - val convergenceThreshold = map.get(ConvergenceThreshold) + def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = { + weightsOption match { + case Some(weights) => { + input.map { + new SquaredResiduals + }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce { + _ + _ + } + } - // calculate dimension of the feature vectors - val dimension = input.map{_.vector.size}.reduce { - (a, b) => - require(a == b, "All input vector must have the same dimension.") - a + case None => { + throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " + + "data. This is necessary to learn the weight vector of the linear function.") + } } - // initial weight vector is set to 0 - val initialWeightVector = createInitialWeightVector(dimension) + } +} + +object MultipleLinearRegression { + val WEIGHTVECTOR_BROADCAST = "weights_broadcast" - // check if a convergence threshold has been set - val resultingWeightVector = convergenceThreshold match { - case Some(convergence) => + // ====================================== Parameters ============================================= - // we have to calculate for each weight vector the sum of squared residuals - val initialSquaredResidualSum = input.map { - new SquaredResiduals - }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce { - _ + _ - } + case object Stepsize extends Parameter[Double] { + val defaultValue = Some(0.1) + } + + case object Iterations extends Parameter[Int] { + val defaultValue = Some(10) + } + + case object ConvergenceThreshold extends Parameter[Double] { + val defaultValue = None + } + + // ======================================== Factory methods ====================================== + + def apply(): MultipleLinearRegression = { + new MultipleLinearRegression() + } + + // ====================================== Operations ============================================= - // combine weight vector with current sum of squared residuals - val initialWeightVectorWithSquaredResidualSum = initialWeightVector. - crossWithTiny(initialSquaredResidualSum).setParallelism(1) + /** Trains the linear model to fit the training data. The resulting weight vector is stored in + * the [[MultipleLinearRegression]] instance. + * + */ + implicit val fitMLR = new FitOperation[MultipleLinearRegression, LabeledVector] { + override def fit( + instance: MultipleLinearRegression, + fitParameters: ParameterMap, + input: DataSet[LabeledVector]) + : Unit = { + val map = instance.parameters ++ fitParameters + + // retrieve parameters of the algorithm + val numberOfIterations = map(Iterations) + val stepsize = map(Stepsize) + val convergenceThreshold = map.get(ConvergenceThreshold) + + // calculate dimension of the feature vectors + val dimension = input.map{_.vector.size}.reduce { + (a, b) => + require(a == b, "All input vector must have the same dimension.") + a + } + + input.flatMap{ + t => + Seq(t) + } + + // initial weight vector is set to 0 + val initialWeightVector = createInitialWeightVector(dimension) + + // check if a convergence threshold has been set + val resultingWeightVector = convergenceThreshold match { + case Some(convergence) => + + // we have to calculate for each weight vector the sum of squared residuals + val initialSquaredResidualSum = input.map { + new SquaredResiduals + }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce { + _ + _ + } + + // combine weight vector with current sum of squared residuals + val initialWeightVectorWithSquaredResidualSum = initialWeightVector. + crossWithTiny(initialSquaredResidualSum).setParallelism(1) - // start SGD iteration - val resultWithResidual = initialWeightVectorWithSquaredResidualSum. - iterateWithTermination(numberOfIterations) { - weightVectorSquaredResidualDS => + // start SGD iteration + val resultWithResidual = initialWeightVectorWithSquaredResidualSum. + iterateWithTermination(numberOfIterations) { + weightVectorSquaredResidualDS => - // extract weight vector and squared residual sum - val weightVector = weightVectorSquaredResidualDS.map{_._1} - val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2} + // extract weight vector and squared residual sum + val weightVector = weightVectorSquaredResidualDS.map{_._1} + val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2} - // TODO: Sample from input to realize proper SGD - val newWeightVector = input.map { - new LinearRegressionGradientDescent - }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce { - (left, right) => + // TODO: Sample from input to realize proper SGD + val newWeightVector = input.map { + new LinearRegressionGradientDescent + }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce { + (left, right) => val (leftBetas, leftBeta0, leftCount) = left val (rightBetas, rightBeta0, rightCount) = right blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1) (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount) - }.map { - new LinearRegressionWeightsUpdate(stepsize) - }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) - - // calculate the sum of squared residuals for the new weight vector - val newResidual = input.map { - new SquaredResiduals - }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce { - _ + _ - } + }.map { + new LinearRegressionWeightsUpdate(stepsize) + }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) + + // calculate the sum of squared residuals for the new weight vector + val newResidual = input.map { + new SquaredResiduals + }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce { + _ + _ + } - // check if the relative change in the squared residual sum is smaller than the - // convergence threshold. If yes, then terminate => return empty termination data set - val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1). - filter{ - pair => { - val (residual, newResidual) = pair - - if (residual <= 0) { - false - } else { - math.abs((residual - newResidual)/residual) >= convergence + // check if the relative change in the squared residual sum is smaller than the + // convergence threshold. If yes, then terminate => return empty termination data set + val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1). + filter{ + pair => { + val (residual, newResidual) = pair + + if (residual <= 0) { + false + } else { + math.abs((residual - newResidual)/residual) >= convergence + } } } - } - // result for new iteration - (newWeightVector cross newResidual, termination) - } + // result for new iteration + (newWeightVector cross newResidual, termination) + } + + // remove squared residual sum to only return the weight vector + resultWithResidual.map{_._1} + + case None => + // No convergence criterion + initialWeightVector.iterate(numberOfIterations) { + weightVector => { + + // TODO: Sample from input to realize proper SGD + input.map { + new LinearRegressionGradientDescent + }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce { + (left, right) => + val (leftBetas, leftBeta0, leftCount) = left + val (rightBetas, rightBeta0, rightCount) = right - // remove squared residual sum to only return the weight vector - resultWithResidual.map{_._1} - - case None => - // No convergence criterion - initialWeightVector.iterate(numberOfIterations) { - weightVector => { - - // TODO: Sample from input to realize proper SGD - input.map { - new LinearRegressionGradientDescent - }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce { - (left, right) => - val (leftBetas, leftBeta0, leftCount) = left - val (rightBetas, rightBeta0, rightCount) = right - - blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1) - (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount) - }.map { - new LinearRegressionWeightsUpdate(stepsize) - }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) + blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1) + (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount) + }.map { + new LinearRegressionWeightsUpdate(stepsize) + }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) + } } - } - } + } - new MultipleLinearRegressionModel(resultingWeightVector) + instance.weightsOption = Some(resultingWeightVector) + } } /** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given @@ -233,28 +295,58 @@ with Serializable { (values, 0.0) } } -} -object MultipleLinearRegression { - val WEIGHTVECTOR_BROADCAST = "weights_broadcast" + /** Calculates the predictions for new data with respect to the learned linear model. + * + * @tparam T Testing data type for which the prediction is calculated. Has to be a subtype of + * [[Vector]] + * @return + */ + implicit def predictVectors[T <: Vector] = { + new PredictOperation[MultipleLinearRegression, T, LabeledVector] { + override def predict( + instance: MultipleLinearRegression, + predictParameters: ParameterMap, + input: DataSet[T]) + : DataSet[LabeledVector] = { + instance.weightsOption match { + case Some(weights) => { + input.map(new LinearRegressionPrediction[T]) + .withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST) + } - // Define parameters for MultipleLinearRegression - case object Stepsize extends Parameter[Double] { - val defaultValue = Some(0.1) + case None => { + throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " + + "data. This is necessary to learn the weight vector of the linear function.") + } + } + } + } } - case object Iterations extends Parameter[Int] { - val defaultValue = Some(10) - } + private class LinearRegressionPrediction[T <: Vector] extends RichMapFunction[T, LabeledVector] { + private var weights: Array[Double] = null + private var weight0: Double = 0 - case object ConvergenceThreshold extends Parameter[Double] { - val defaultValue = None - } - // ====================== Facotry methods ========================== + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val t = getRuntimeContext + .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) + + val weightsPair = t.get(0) - def apply(): MultipleLinearRegression = { - new MultipleLinearRegression() + weights = weightsPair._1 + weight0 = weightsPair._2 + } + + override def map(value: T): LabeledVector = { + val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1) + + val prediction = dotProduct + weight0 + + LabeledVector(prediction, value) + } } } @@ -387,63 +479,3 @@ RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] { (newWeights, newWeight0) } } - -//-------------------------------------------------------------------------------------------------- -// Model definition -//-------------------------------------------------------------------------------------------------- - -/** Multiple linear regression model returned by [[MultipleLinearRegression]]. The model stores the - * calculated weight vector and applies the linear model to given vectors v: - * - * `hat y = w^T*v + w_0` - * - * with `hat y` being the predicted regression value. - * - * @param weights DataSet containing the calculated weight vector - */ -class MultipleLinearRegressionModel private[regression]( - val weights: DataSet[(Array[Double], Double)]) - extends Transformer[ Vector, LabeledVector ] - with Serializable { - - import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST - - // predict regression value for input - override def transform(input: DataSet[Vector], - parameters: ParameterMap): DataSet[LabeledVector] = { - input.map(new LinearRegressionPrediction).withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST) - } - - def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = { - input.map{ - new SquaredResiduals() - }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce{ - _ + _ - } - } - - private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] { - private var weights: Array[Double] = null - private var weight0: Double = 0 - - - @throws(classOf[Exception]) - override def open(configuration: Configuration): Unit = { - val t = getRuntimeContext - .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) - - val weightsPair = t.get(0) - - weights = weightsPair._1 - weight0 = weightsPair._2 - } - - override def map(value: Vector): LabeledVector = { - val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1) - - val prediction = dotProduct + weight0 - - LabeledVector(prediction, value) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/resources/log4j-test.properties b/flink-staging/flink-ml/src/test/resources/log4j-test.properties index f3c51b8..76b237e 100644 --- a/flink-staging/flink-ml/src/test/resources/log4j-test.properties +++ b/flink-staging/flink-ml/src/test/resources/log4j-test.properties @@ -1,20 +1,20 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and # limitations under the License. -# +################################################################################ log4j.rootLogger=OFF, console @@ -32,4 +32,7 @@ log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file +log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala new file mode 100644 index 0000000..a5e7496 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "The CoCoA implementation" + + it should "train a SVM" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val cocoa = CoCoA(). + setBlocks(env.getParallelism). + setIterations(100). + setLocalIterations(100). + setRegularization(0.002). + setStepsize(0.1). + setSeed(0) + + val trainingDS = env.fromCollection(Classification.trainingData) + + cocoa.fit(trainingDS) + + val weightVector = cocoa.weightsOption.get.collect().apply(0) + + weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach { + case (weight, expectedWeight) => + weight should be(expectedWeight +- 0.1) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala deleted file mode 100644 index 0f000a3..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.classification - -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.flink.api.scala._ -import org.apache.flink.test.util.FlinkTestBase - -class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase { - - behavior of "The CoCoA implementation" - - it should "train a SVM" in { - val env = ExecutionEnvironment.getExecutionEnvironment - - val learner = CoCoA(). - setBlocks(env.getParallelism). - setIterations(100). - setLocalIterations(100). - setRegularization(0.002). - setStepsize(0.1). - setSeed(0) - - val trainingDS = env.fromCollection(Classification.trainingData) - - val model = learner.fit(trainingDS) - - val weightVector = model.weights.collect().apply(0) - - weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach { - case (weight, expectedWeight) => - weight should be(expectedWeight +- 0.1) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala deleted file mode 100644 index a185282..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.experimental - -import org.scalatest.FlatSpec - -import org.apache.flink.api.scala._ -import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.math.{SparseVector, DenseVector, Vector} -import org.apache.flink.test.util.FlinkTestBase - -class SciKitPipelineSuite extends FlatSpec with FlinkTestBase { - behavior of "Pipeline" - - it should "work" in { - val env = ExecutionEnvironment.getExecutionEnvironment - - val scaler = new Scaler - val offset = new Offset - - val input: DataSet[Vector] = env.fromCollection(List(DenseVector(2,1,3), SparseVector.fromCOO(3, (1,1), (2,2)))) - val training = env.fromCollection(List(LabeledVector(1.0, DenseVector(2,3,1)), LabeledVector(2.0, SparseVector.fromCOO(3, (1,1), (2,2))))) - val intData = env.fromCollection(List(1,2,3,4)) - - val result = scaler.transform(input) - - result.print() - - val result2 = offset.transform(input) - result2.print() - - val chain = scaler.chainTransformer(offset) - - val result3 = chain.transform(input)(ChainedTransformer.chainedTransformOperation(Scaler.vTransform, Offset.offsetTransform)) - - result3.print() - - val chain2 = chain.chainTransformer(scaler) - val result4 = chain2.transform(input) - - result4.print() - - val kmeans = new KMeans() - - val chainedPredictor = chain.chainPredictor(kmeans) - - val prediction = chainedPredictor.predict(result) - - prediction.print() - - env.execute() - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala index b930ceb..0f045ab 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala @@ -21,6 +21,7 @@ package org.apache.flink.ml.feature import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.ml.common.LabeledVector import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.preprocessing.PolynomialFeatures import org.scalatest.{Matchers, FlatSpec} import org.apache.flink.api.scala._ @@ -45,10 +46,10 @@ class PolynomialBaseITSuite val inputDS = env.fromCollection (input) - val transformer = PolynomialBase () + val transformer = PolynomialFeatures() .setDegree (3) - val transformedDS = transformer.transform (inputDS) + val transformedDS = transformer.transform(inputDS) val expectedMap = List ( (1.0 -> DenseVector (1.0, 1.0, 1.0) ), @@ -81,7 +82,7 @@ class PolynomialBaseITSuite val inputDS = env.fromCollection(input) - val transformer = PolynomialBase() + val transformer = PolynomialFeatures() .setDegree(3) val transformedDS = transformer.transform(inputDS) @@ -106,7 +107,7 @@ class PolynomialBaseITSuite val inputDS = env.fromCollection(input) - val transformer = PolynomialBase() + val transformer = PolynomialFeatures() .setDegree(0) val transformedDS = transformer.transform(inputDS) http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala new file mode 100644 index 0000000..8803195 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler} +import org.apache.flink.ml.regression.MultipleLinearRegression +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} + +class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "Flink's pipelines" + + it should "support chaining of compatible transformer" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0)) + val lvData = List(LabeledVector(1.0, DenseVector(1.0, 1.0, 1.0)), + LabeledVector(2.0, DenseVector(2.0, 2.0, 2.0))) + + val vectorData = env.fromCollection(vData) + val labeledVectorData = env.fromCollection(lvData) + + val expectedScaledVectorSet = Set( + DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0), + DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) + ) + + val expectedScaledLabeledVectorSet = Set( + LabeledVector(1.0, DenseVector(1.0, 3.0, 5.0, 9.0, 15.0, 25.0, -1.0, -3.0, -5.0)), + LabeledVector(2.0, DenseVector(1.0, -1.0, -3.0, 1.0, 3.0, 9.0, 1.0, -1.0, -3.0)) + ) + + val scaler = StandardScaler() + val polyFeatures = PolynomialFeatures().setDegree(2) + + val pipeline = scaler.chainTransformer(polyFeatures) + + pipeline.fit(vectorData) + + val scaledVectorDataDS = pipeline.transform(vectorData) + val scaledLabeledVectorDataDS = pipeline.transform(labeledVectorData) + + val scaledVectorData = scaledVectorDataDS.collect() + val scaledLabeledVectorData = scaledLabeledVectorDataDS.collect() + + scaledVectorData.size should be(expectedScaledVectorSet.size) + + for(scaledVector <- scaledVectorData){ + expectedScaledVectorSet should contain(scaledVector) + } + + scaledLabeledVectorData.size should be(expectedScaledLabeledVectorSet.size) + + for(scaledLabeledVector <- scaledLabeledVectorData) { + expectedScaledLabeledVectorSet should contain(scaledLabeledVector) + } + } + + it should "throw an exception when the pipeline operators are not compatible" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val scaler = StandardScaler() + val mlr = MultipleLinearRegression() + + val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0)) + val vectorData = env.fromCollection(vData) + + val pipeline = scaler.chainPredictor(mlr) + + val exception = intercept[RuntimeException] { + pipeline.fit(vectorData) + } + + exception.getMessage should equal("There is no FitOperation defined for class org.apache." + + "flink.ml.regression.MultipleLinearRegression which trains on a " + + "DataSet[class org.apache.flink.ml.math.DenseVector]") + } + + it should "throw an exception when the input data is not supported" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val dData = List(1.0, 2.0, 3.0) + val doubleData = env.fromCollection(dData) + + val scaler = StandardScaler() + val polyFeatures = PolynomialFeatures() + + val pipeline = scaler.chainTransformer(polyFeatures) + + val exception = intercept[RuntimeException] { + pipeline.fit(doubleData) + } + + exception.getMessage should equal("There is no FitOperation defined for class org.apache." + + "flink.ml.preprocessing.StandardScaler which trains on a DataSet[double]") + } + + it should "support multiple transformers and a predictor" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)), + LabeledVector(2.0, DenseVector(2.0, 3.0)), + LabeledVector(3.0, DenseVector(3.0, 4.0))) + + val trainingData = env.fromCollection(data) + + val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler()) + val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler()) + val chainedScalers4 = chainedScalers3.chainTransformer(StandardScaler()) + val chainedScalers5 = chainedScalers4.chainTransformer(StandardScaler()) + + val predictor = MultipleLinearRegression() + + + val pipeline = chainedScalers5.chainPredictor(predictor) + + pipeline.fit(trainingData) + + val weightVector = predictor.weightsOption.get.collect().head + + weightVector._1.foreach{ + _ should be (0.367282 +- 0.01) + } + + weightVector._2 should be (1.3131727 +- 0.01) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala index ac3cbb6..30875b3 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala @@ -41,8 +41,9 @@ class StandardScalerITSuite val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromCollection(data) - val transformer = StandardScaler() - val scaledVectors = transformer.transform(dataSet).collect + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect scaledVectors.length should equal(data.length) @@ -73,8 +74,9 @@ class StandardScalerITSuite val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromCollection(data) - val transformer = StandardScaler().setMean(10.0).setStd(2.0) - val scaledVectors = transformer.transform(dataSet).collect + val scaler = StandardScaler().setMean(10.0).setStd(2.0) + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect scaledVectors.length should equal(data.length) http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala index 245d7a8..2ad310d 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -48,13 +48,13 @@ class ALSITSuite val inputDS = env.fromCollection(data) - val model = als.fit(inputDS) + als.fit(inputDS) val testData = env.fromCollection(expectedResult.map{ case (userID, itemID, rating) => (userID, itemID) }) - val predictions = model.transform(testData).collect() + val predictions = als.predict(testData).collect() predictions.length should equal(expectedResult.length) @@ -70,7 +70,7 @@ class ALSITSuite } } - val risk = model.empiricalRisk(inputDS).collect().apply(0) + val risk = als.empiricalRisk(inputDS).collect().apply(0) risk should be(expectedEmpiricalRisk +- 1) } http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 2d3f770..8be239a 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -20,7 +20,7 @@ package org.apache.flink.ml.regression import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.ml.common.ParameterMap -import org.apache.flink.ml.feature.PolynomialBase +import org.apache.flink.ml.preprocessing.PolynomialFeatures import org.scalatest.{Matchers, FlatSpec} import org.apache.flink.api.scala._ @@ -38,7 +38,7 @@ class MultipleLinearRegressionITSuite env.setParallelism(2) - val learner = MultipleLinearRegression() + val mlr = MultipleLinearRegression() import RegressionData._ @@ -49,9 +49,9 @@ class MultipleLinearRegressionITSuite parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) val inputDS = env.fromCollection(data) - val model = learner.fit(inputDS, parameters) + mlr.fit(inputDS, parameters) - val weightList = model.weights.collect() + val weightList = mlr.weightsOption.get.collect() weightList.size should equal(1) @@ -63,7 +63,7 @@ class MultipleLinearRegressionITSuite } weight0 should be (expectedWeight0 +- 0.4) - val srs = model.squaredResidualSum(inputDS).collect().apply(0) + val srs = mlr.squaredResidualSum(inputDS).collect().apply(0) srs should be (expectedSquaredResidualSum +- 2) } @@ -73,21 +73,21 @@ class MultipleLinearRegressionITSuite env.setParallelism(2) - val polynomialBase = PolynomialBase() - val learner = MultipleLinearRegression() + val polynomialBase = PolynomialFeatures() + val mlr = MultipleLinearRegression() - val pipeline = polynomialBase.chain(learner) + val pipeline = polynomialBase.chainPredictor(mlr) val inputDS = env.fromCollection(RegressionData.polynomialData) val parameters = ParameterMap() - .add(PolynomialBase.Degree, 3) + .add(PolynomialFeatures.Degree, 3) .add(MultipleLinearRegression.Stepsize, 0.002) .add(MultipleLinearRegression.Iterations, 100) - val model = pipeline.fit(inputDS, parameters) + pipeline.fit(inputDS, parameters) - val weightList = model.weights.collect() + val weightList = mlr.weightsOption.get.collect() weightList.size should equal(1) @@ -102,7 +102,7 @@ class MultipleLinearRegressionITSuite val transformedInput = polynomialBase.transform(inputDS, parameters) - val srs = model.squaredResidualSum(transformedInput).collect().apply(0) + val srs = mlr.squaredResidualSum(transformedInput).collect().apply(0) srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) } http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml index d039a8b..2ebd606 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml @@ -84,7 +84,7 @@ under the License. <build> <plugins> - <!-- get default data from flink-java-examples pipeline --> + <!-- get default data from flink-java-examples package --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java index 188ac4a..d7fbc8e 100644 --- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java +++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java @@ -19,7 +19,7 @@ /** * <strong>Table API</strong><br> * - * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming + * This package contains the generic part of the Table API. It can be used with Flink Streaming * and Flink Batch. From Scala as well as from Java. * * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index 37c5937..e74651b 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -26,7 +26,7 @@ import scala.language.implicitConversions /** * == Table API (Scala) == * - * Importing this pipeline with: + * Importing this package with: * * {{{ * import org.apache.flink.api.scala.table._ http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala index f50ca02..c5c8c94 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table /** - * This pipeline contains the base class of AST nodes and all the expression language AST classes. + * This package contains the base class of AST nodes and all the expression language AST classes. * Expression trees should not be manually constructed by users. They are implicitly constructed * from the implicit DSL conversions in * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala index a31ec61..bdcb22c 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala @@ -20,7 +20,7 @@ package org.apache.flink.api /** * == Table API == * - * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming + * This package contains the generic part of the Table API. It can be used with Flink Streaming * and Flink Batch. From Scala as well as from Java. * * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala index adb9890..a598483 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table /** - * The operations in this pipeline are created by calling methods on [[Table]] they + * The operations in this package are created by calling methods on [[Table]] they * should not be manually created by users of the API. */ package object plan http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala index 155a17e..a1bc4b7 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala @@ -18,6 +18,6 @@ package org.apache.flink.api.table /** - * The functions in this pipeline are used transforming Table API operations to Java API operations. + * The functions in this package are used transforming Table API operations to Java API operations. */ package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java index fb80798..3b2fb7f 100644 --- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java +++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java @@ -150,7 +150,7 @@ public class TachyonFileSystemWrapperTest { } } - // pipeline visible + // package visible static final class DopOneTestEnvironment extends LocalEnvironment { static { initializeContextEnvironment(new ExecutionEnvironmentFactory() { http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java index 5c30785..d61f80e 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java @@ -86,7 +86,7 @@ public class TPCHQuery3 { } }); - // Join customers with orders and pipeline them into a ShippingPriorityItem + // Join customers with orders and package them into a ShippingPriorityItem DataSet<ShippingPriorityItem> customerWithOrders = customers.join(orders).where(0).equalTo(1) .with( http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-custominput-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml index 18adc47..e6f3568 100644 --- a/flink-tests/src/test/assembly/test-custominput-assembly.xml +++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your pipeline(s) --> + <!--modify/add include to match your package(s) --> <includes> <include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram.class</include> <include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-kmeans-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml index 3c547fb..a8d34ab 100644 --- a/flink-tests/src/test/assembly/test-kmeans-assembly.xml +++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your pipeline(s) --> + <!--modify/add include to match your package(s) --> <includes> <include>org/apache/flink/test/classloading/jar/KMeansForTest.class</include> <include>org/apache/flink/test/classloading/jar/KMeansForTest$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml index b311700..8321b21 100644 --- a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml +++ b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your pipeline(s) --> + <!--modify/add include to match your package(s) --> <includes> <include>org/apache/flink/test/classloading/jar/StreamingProgram.class</include> <include>org/apache/flink/test/classloading/jar/StreamingProgram$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java index b3734a8..349275c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java @@ -92,7 +92,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|R|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|R|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|R|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|R|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|R|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java index 7dc76b5..a0236c2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java @@ -85,7 +85,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"