[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/3637#issuecomment-69275417 I'll plan to try using this new API for a couple of examples tomorrow and see if anything stands out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/3637#issuecomment-69275360 I've left a set of comments on the changes as they are. At a high level these APIs feel heavy and there seem to be a lot of things you have to get just right to add a new algorithm. Some of these aren't enforced at compile time, which makes me wonder if there's not a better class hierarchy we can use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693524 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala --- @@ -17,6 +17,10 @@ package org.apache.spark.ml.param +/* NOTE TO DEVELOPERS: + * If you add these parameter traits into your algorithm, you need to add a setter method as well. --- End diff -- Maybe we should update this comment and explain *why* the setter must be added? Code will still compile, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693403 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -80,69 +50,157 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - def setLabelCol(value: String): this.type = set(labelCol, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) -import dataset.sqlContext._ + +// Extract columns from data. If dataset is persisted, do not persist oldDataset. +val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap -val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) - .map { case Row(label: Double, features: Vector) => -LabeledPoint(label, features) - }.persist(StorageLevel.MEMORY_AND_DISK) +val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE +if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) +} + +// Train model val lr = new LogisticRegressionWithLBFGS lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) -val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) -instances.unpersist() +val oldModel = lr.run(oldDataset) +val lrm = new LogisticRegressionModel(this, map, oldModel.weights, oldModel.intercept) + +if (handlePersistence) { + oldDataset.unpersist() +} + // copy model params Params.inheritValues(map, this, lrm) lrm } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = true) - } + override protected def featuresDataType: DataType = new VectorUDT } + /** * :: AlphaComponent :: + * * Model produced by [[LogisticRegression]]. */ @AlphaComponent class LogisticRegressionModel private[ml] ( override val parent: LogisticRegression, override val fittingParamMap: ParamMap, -weights: Vector) - extends Model[LogisticRegressionModel] with LogisticRegressionParams { +val weights: Vector, +val intercept: Double) + extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] + with LogisticRegressionParams { + + setThreshold(0.5) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = false) + private val margin: Vector => Double = (features) => { +BLAS.dot(features, weights) + intercept + } + + private val score: Vector => Double = (features) => { +val m = margin(features) +1.0 / (1.0 + math.exp(-m)) } override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) + import dataset.sqlContext._ val map = this.paramMap ++ paramMap -val score: Vector => Double = (v) => { - val margin = BLAS.dot(v, weights) - 1.0 / (1.0 + math.exp(-margin)) + +// Output selected columns only. +// This is a bit complicated since it tries to avoid repeated computation. +// rawPrediction (-margin, margin) +// probability (1.0-score, score) +// prediction (max margin) +var tmpData = dataset +var numColsOutput = 0 +if (map(rawPredictionCol) != "") { + val features2raw: Vector => Vector = predictRaw + tmpData = tmpData.select(Star(None), +features2raw.call(map(featuresCol).attr) as map(rawPredictionC
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693428 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} +import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star + +/** + * Params for probabilistic classification. + */ +private[classification] trait ProbabilisticClassifierParams + extends ClassifierParams with HasProbabilityCol { + + override protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean, + featuresDataType: DataType): StructType = { +val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting, featuresDataType) +val map = this.paramMap ++ paramMap +addOutputColumn(parentSchema, map(probabilityCol), new VectorUDT) + } +} + + +/** + * :: AlphaComponent :: + * + * Single-label binary or multiclass classifier which can output class conditional probabilities. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class ProbabilisticClassifier[ +FeaturesType, +Learner <: ProbabilisticClassifier[FeaturesType, Learner, M], +M <: ProbabilisticClassificationModel[FeaturesType, M]] + extends Classifier[FeaturesType, Learner, M] with ProbabilisticClassifierParams { + + def setProbabilityCol(value: String): Learner = set(probabilityCol, value).asInstanceOf[Learner] +} + + +/** + * :: AlphaComponent :: + * + * Model produced by a [[ProbabilisticClassifier]]. + * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class ProbabilisticClassificationModel[ +FeaturesType, +M <: ProbabilisticClassificationModel[FeaturesType, M]] + extends ClassificationModel[FeaturesType, M] with ProbabilisticClassifierParams { + + def setProbabilityCol(value: String): M = set(probabilityCol, value).asInstanceOf[M] + + /** + * Transforms dataset by reading from [[featuresCol]], and appending new columns as specified by + * parameters: + * - predicted labels as [[predictionCol]] of type [[Double]] + * - raw predictions (confidences) as [[rawPredictionCol]] of type [[Vector]] + * - probability of each class as [[probabilityCol]] of type [[Vector]]. + * + * @param dataset input dataset + * @param paramMap additional parameters, overwrite embedded params + * @return transformed dataset + */ + override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { +// This default implementation should be overridden as needed. +import dataset.sqlContext._ +import org.apache.spark.sql.catalyst.dsl._ + +// Check schema +transformSchema(dataset.schema, paramMap, logging = true) +val map = this.paramMap ++ paramMap + +// Prepare model +val tmpModel = if (paramMap.size != 0) { + val tmpModel = this.copy() + Params.inheritValues(paramMap, parent, tmpModel) + tmpModel +} else { + this +} + +val (numColsOutput, outputData) = + ClassificationModel.transformColumnsImpl[FeaturesType](dataset, tmpModel, map) + +// Output selected columns only.
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693364 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -80,69 +50,157 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - def setLabelCol(value: String): this.type = set(labelCol, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) -import dataset.sqlContext._ + +// Extract columns from data. If dataset is persisted, do not persist oldDataset. +val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap -val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) - .map { case Row(label: Double, features: Vector) => -LabeledPoint(label, features) - }.persist(StorageLevel.MEMORY_AND_DISK) +val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE +if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) +} + +// Train model val lr = new LogisticRegressionWithLBFGS lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) -val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) -instances.unpersist() +val oldModel = lr.run(oldDataset) +val lrm = new LogisticRegressionModel(this, map, oldModel.weights, oldModel.intercept) + +if (handlePersistence) { + oldDataset.unpersist() +} + // copy model params Params.inheritValues(map, this, lrm) lrm } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = true) - } + override protected def featuresDataType: DataType = new VectorUDT } + /** * :: AlphaComponent :: + * * Model produced by [[LogisticRegression]]. */ @AlphaComponent class LogisticRegressionModel private[ml] ( override val parent: LogisticRegression, override val fittingParamMap: ParamMap, -weights: Vector) - extends Model[LogisticRegressionModel] with LogisticRegressionParams { +val weights: Vector, +val intercept: Double) + extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] + with LogisticRegressionParams { + + setThreshold(0.5) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = false) + private val margin: Vector => Double = (features) => { +BLAS.dot(features, weights) + intercept + } + + private val score: Vector => Double = (features) => { +val m = margin(features) +1.0 / (1.0 + math.exp(-m)) } override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) + import dataset.sqlContext._ val map = this.paramMap ++ paramMap -val score: Vector => Double = (v) => { - val margin = BLAS.dot(v, weights) - 1.0 / (1.0 + math.exp(-margin)) + +// Output selected columns only. +// This is a bit complicated since it tries to avoid repeated computation. --- End diff -- What is duplicated and what duplicate computation is being avoided? Can some refactoring of both be done to make them more modular? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apac
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693295 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -80,69 +50,157 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - def setLabelCol(value: String): this.type = set(labelCol, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) -import dataset.sqlContext._ + +// Extract columns from data. If dataset is persisted, do not persist oldDataset. +val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap -val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) - .map { case Row(label: Double, features: Vector) => -LabeledPoint(label, features) - }.persist(StorageLevel.MEMORY_AND_DISK) +val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE +if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) +} + +// Train model val lr = new LogisticRegressionWithLBFGS lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) -val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) -instances.unpersist() +val oldModel = lr.run(oldDataset) +val lrm = new LogisticRegressionModel(this, map, oldModel.weights, oldModel.intercept) + +if (handlePersistence) { + oldDataset.unpersist() +} + // copy model params Params.inheritValues(map, this, lrm) lrm } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = true) - } + override protected def featuresDataType: DataType = new VectorUDT --- End diff -- Ehh... nevermind.. i think i got it. Feels very strange - if we must have this can't we make VectorUDT the default? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693206 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -80,69 +50,157 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - def setLabelCol(value: String): this.type = set(labelCol, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { +// Check schema transformSchema(dataset.schema, paramMap, logging = true) -import dataset.sqlContext._ + +// Extract columns from data. If dataset is persisted, do not persist oldDataset. +val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap -val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) - .map { case Row(label: Double, features: Vector) => -LabeledPoint(label, features) - }.persist(StorageLevel.MEMORY_AND_DISK) +val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE +if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) +} + +// Train model val lr = new LogisticRegressionWithLBFGS lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) -val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) -instances.unpersist() +val oldModel = lr.run(oldDataset) +val lrm = new LogisticRegressionModel(this, map, oldModel.weights, oldModel.intercept) + +if (handlePersistence) { + oldDataset.unpersist() +} + // copy model params Params.inheritValues(map, this, lrm) lrm } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { -validateAndTransformSchema(schema, paramMap, fitting = true) - } + override protected def featuresDataType: DataType = new VectorUDT --- End diff -- What is this for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22693073 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} +import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} +import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star + +/** + * :: DeveloperApi :: + * Params for classification. + */ +@DeveloperApi +trait ClassifierParams extends PredictorParams + with HasRawPredictionCol { + + override protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean, + featuresDataType: DataType): StructType = { +val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting, featuresDataType) +val map = this.paramMap ++ paramMap +addOutputColumn(parentSchema, map(rawPredictionCol), new VectorUDT) + } +} + +/** + * :: AlphaComponent :: + * Single-label binary or multiclass classification. + * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class Classifier[ +FeaturesType, +Learner <: Classifier[FeaturesType, Learner, M], +M <: ClassificationModel[FeaturesType, M]] + extends Predictor[FeaturesType, Learner, M] + with ClassifierParams { + + def setRawPredictionCol(value: String): Learner = +set(rawPredictionCol, value).asInstanceOf[Learner] + + // TODO: defaultEvaluator (follow-up PR) +} + +/** + * :: AlphaComponent :: + * Model produced by a [[Classifier]]. + * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]] + extends PredictionModel[FeaturesType, M] with ClassifierParams { + + def setRawPredictionCol(value: String): M = set(rawPredictionCol, value).asInstanceOf[M] + + /** Number of classes (values which the label can take). */ + def numClasses: Int --- End diff -- How hard/weird would it be to make labels an Enumeration? This class could be inferred from the training set at run-time or supplied by the user, and then the user doesn't pass the number of classes to the model, but instead what the set of labels actually are. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22692953 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} +import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} +import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star + +/** + * :: DeveloperApi :: + * Params for classification. + */ +@DeveloperApi +trait ClassifierParams extends PredictorParams + with HasRawPredictionCol { + + override protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean, + featuresDataType: DataType): StructType = { +val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting, featuresDataType) +val map = this.paramMap ++ paramMap +addOutputColumn(parentSchema, map(rawPredictionCol), new VectorUDT) + } +} + +/** + * :: AlphaComponent :: + * Single-label binary or multiclass classification. + * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class Classifier[ --- End diff -- I don't have a concrete suggestion here, but these abstract types are starting to get complicated/look redundant. Is "Learner" only there to make subclassing cleaner? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22691031 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.ml.classification.{Classifier, ClassifierParams, ClassificationModel} +import org.apache.spark.ml.param.{Params, IntParam, ParamMap} +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors, VectorUDT} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.sql.{DataType, SchemaRDD, Row, SQLContext} + +/** + * A simple example demonstrating how to write your own learning algorithm using Estimator, + * Transformer, and other abstractions. + * This mimics [[org.apache.spark.ml.classification.LogisticRegression]]. + * Run with + * {{{ + * bin/run-example ml.DeveloperApiExample + * }}} + */ +object DeveloperApiExample { + + def main(args: Array[String]) { +val conf = new SparkConf().setAppName("DeveloperApiExample") +val sc = new SparkContext(conf) +val sqlContext = new SQLContext(sc) +import sqlContext._ + +// Prepare training data. +val training = sparkContext.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), + LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), + LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), + LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5 + +// Create a LogisticRegression instance. This instance is an Estimator. +val lr = new MyLogisticRegression() +// Print out the parameters, documentation, and any default values. +println("MyLogisticRegression parameters:\n" + lr.explainParams() + "\n") + +// We may set parameters using setter methods. +lr.setMaxIter(10) + +// Learn a LogisticRegression model. This uses the parameters stored in lr. +val model = lr.fit(training) + +// Prepare test data. +val test = sparkContext.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), + LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), + LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5 + +// Make predictions on test data. +val sumPredictions: Double = model.transform(test) + .select('features, 'label, 'prediction) + .collect() + .map { case Row(features: Vector, label: Double, prediction: Double) => +prediction + }.sum +assert(sumPredictions == 0.0, + "MyLogisticRegression predicted something other than 0, even though all weights are 0!") + } +} + +/** + * Example of defining a parameter trait for a user-defined type of [[Classifier]]. + * + * NOTE: This is private since it is an example. In practice, you may not want it to be private. + */ +private trait MyLogisticRegressionParams extends ClassifierParams { + + /** param for max number of iterations */ + val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations") + def getMaxIter: Int = get(maxIter) --- End diff -- I'm a little confused here - do I understand correctly that you need to specify a getter by convention? Or is this just showing that you *can* specify a getter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --
[GitHub] spark pull request: [SPARK-4789] [SPARK-4942] [SPARK-5031] [mllib]...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/3637#discussion_r22679713 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala --- @@ -101,10 +102,10 @@ object CrossValidatorExample { // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test) - .select('id, 'text, 'score, 'prediction) + .select('id, 'text, 'probability, 'prediction) .collect() - .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) => - println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction) + .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => + println("(" + id + ", " + text + ") --> prob=" + prob + ", prediction=" + prediction) --- End diff -- Minor nit - I find string interpolation (compatible with Scala 2.10+) makes these kinds of lines much more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3530][MLLIB] pipeline and parameters wi...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/3099#issuecomment-62497848 Hi guys - I spent a good deal of time today writing a new pipeline against this PR, and currently find it pretty difficult to use: Some issues: 1. LOTS of boilerplate - the UnaryTransformer is nice, but in many cases we've got non-unary transformers, and the setters are messy to write and non-intuitive. Even in UnitaryTransformers - this ClassTag thing doesn't make a lot of sense and seems unnecessary from a user's perspective. The transform(schema, paramMap) stuff is all basically boilerplate with just some names changed, and could be handled by the type system. As is the code which packs the estimator into a model, which is needed basically with every new model family. 2. Run-time typechecking already bit me pretty hard - for example, i copy/pasted the line "randomSignNode.setOutputCol("fftFeatures")" when I meant to write "fftTransform.setOutputCol("fftFeatures")" and this didn't get caught until runtime and the stack was not very helpful for debugging. 3. Serialization - it's fundamentally pretty tough to figure out what's actually getting serialized into closures. This is not a unique problem here, but with the rather extensive inheritance structure we're imposing, it makes it harder to track serialization issues down and debug them. 4. Tuning - while I appreciate the focus on supporting parameter tuning in a first release, I think it's making this PR more complicated than it needs to be - e.g. removing it would obviate the need for an Evaluator which might make things simpler (an evaluator could be just a transformer). 5. Even slightly complicated pipelines are tough to write. For example, if I have an Estimator that takes an Iterator[DataSet] and produces a Model, it is difficult to express in this framework. 6. Various classes/traits that user code effectively needs are marked private - VectorUDT, Has*Col, etc. 7. I still think getting rid of setters and just using constructor arguments would simplify things here. As new versions of the same PipelineNode with more options get added, we'd need to add additional constructors and support calling in the old way for API compatibility - it's not pretty but I think it's better than the current proposal. 8. Why can't model just be a transformer? Because it needs to know what called it? 9. In many cases, we don't want our batch operation to be a .map on every row of the RDD - instead we want to write batch operators on the full partitions for efficiency purposes - the UnitaryTransformer interface makes it difficult to chain operators like this together. Overall - at this point everything outside of a very simple UnaryTransformer is very difficult for humans to write - and even with that there are a few things you have to get *just right* in order for everything to compile and run. I've shared a branch with the people on this thread that was an attempt to rebuild @shivaram 's MNIST pipeline against the current interface. (http://github.com/amplab/ml-pipelines) It compiles, but errors out on serialization issues at runtime - and it's difficult to figure out why with the current structure. You'll need rebuild spark with some fields no longer marked private to get it to run. You can follow the instructions in the README here, but you'll change the entry point to pipelines.MnistRandomFFTPipeline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3530][MLLIB] pipeline and paramete...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/3099#issuecomment-61978539 Oh, and I'm not saying "let's not support PMML" - I'm saying let's have a sensible way of handing off models to other JVM programs that doesn't involve writing to XML in the middle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3530][MLLIB] pipeline and paramete...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/3099#issuecomment-61977531 I've got several other comments on this PR - mostly good, and will leave some more detailed comments in a bit. TL;DR - What's wrong with Java Serialization? But - PMML support seems a little like overkill here. I admit I'm fairly ignorant to the details of PMML, but my understanding is that it is designed to facilitate transfer of models between languages - e.g. SAS to Java to R, etc. While I'm sure it's general purpose enough to capture complex pipelines, I'd be surprised if it can do so efficiently. @jegonzal and @dcrankshaw are talking about a java-based runtime environment for serving complex pipelines trained via spark and MLlib. Spark is already pretty good about shipping JVM code around to remote sites and executing it - it does so by serializing and shipping standard Java objects, and I don't see why we should follow a very different pattern here. This design has been part of MLlib since day 1. I don't want to speak for these two, but I don't think they have a problem having a runtime dependency on MLlib or some other JVM-based machine learning library, their main issue is that they don't want to have to call into a batch system (aka spark) to execute their models. @jkbradley - I agree that some models or transformers are going to require a lot of state, and potentially distributed computation, but this should be the exception, not the rule. In general, an Estimator should compute a fairly small object (Transformer) which is small enough to be passed around and doesn't need cluster resources to run. In the case of outlier removal, for example, I'd imagine that the Estimator would take several passes over the data and compute some sufficient statistics to remove new points. For cases like ALS, where the model is really big, this is exactly where @jegonzal and @dcrankshaw's research comes in. To make an analogy - just as I'd happily use spark to compute a distributed inverted index, I certainly wouldn't use it to do point-queries on that index. So some interface for transmitting this large distributed state to a system more prepared to answer point queries based on that state is required. At any rate - this is great stuff, I just want to make sure we don't get lost in the weeds of supporting the general case at the expense of streamlined support for the obvious case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3572] [sql] [mllib] User-Defined Types ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/2919#issuecomment-60969356 I've looked at the code here, and basically seems reasonable. One high-level concern I have is around the programming pattern that this encourages: complex nesting of otherwise simple structure that may make it difficult to program against Datasets for sufficiently complicated applications. A 'dataset' is now a collection of Row, where we have the guarantee that all rows in a Dataset conform to the same schema. A schema is a list of (name, type) pairs which describe the attributes available in the dataset. This seems like a good thing to me, and is pretty much what we described in MLI (and how conventional databases have been structured forever). So far, so good. The concern that I have is that we are now encouraging these attributes to be complex types. For example, where I might have had val x = Schema(('a', classOf[String]), ('b', classOf[Double]), ..., ("z", classOf[Double])) This would become val x = Schema(('a', classOf[String]), ('bGroup', classOf[Vector]), .., ("zGroup", classOf[Vector])) So, great, my schema now has these vector things in them, which I can create separately, pass around, etc. This clearly has its merits: 1) Features are groups together logically based on the process that creates them. 2) Managing one short schema where each record is comprised of a few large objects (say, 4 vectors, each of length 1000) is probably easier than managing a really big schema comprised of lots small objects (say, 4000 doubles). But, there are some major drawbacks 1) Why only stop at one level of nesting? Why not have Vector[Vector]? 2) How do learning algorithms, like SVM or PCA deal with these Datasets? Is there an implicit conversion that flattens these things to RDD[LabeledPoint]? Do we want to guarantee these semantics? 3) Manipulating and subsetting nested schemas like this might be tricky. Where before I might be able to write: val x: Dataset = input.select(Seq(0,1,2,4,180,181,1000,1001,1002)) now I might have to write val groupSelections = Seq(Seq(0,1,2,4),Seq(0,1),Seq(0,1,2)) val x: Dataset = groupSelections.zip(input.columns).map {case (gs, col) => col(gs) } Ignoring raw syntax and semantics of how you might actually map an operation over the columns of a Dataset and get back a well-structured dataset, I think this makes two conflicting points: 1) In the first example - presumably all the work goes into figuring out what the subset of features you want is in this really wide feature space. 2) In the second example - thereâs a lot of gymnastics that goes into subsetting feature groups. I think itâs clear that working with lots of feature groups might get unreasonable pretty quickly. If we look at R or pandas/scikit-learn as examples of projects that have (arguably quite successfully) dealt with these interface issues, there is one basic pattern: learning algorithms expect big tables of numbers as input. Even here, there are some important differences: For example, in scikit-learn, categorical features arenât supported directly by most learning algorithms. Instead, users are responsible for getting data from âtable with heterogenously typed columnsâ to âtable of numbers.â with something like OneHotEncoder and other feature transformers. In R, on the other hand, categorical features are (sometimes frustratingly) first class citizens by virtue of the âfactorâ data type - which is essentially and enum. Most out-of-the-box learning algorithms (like glm()) accept data frames with categorical inputs and handle them sensibly - implicitly one hot encoding (or creating dummy variables, if you prefer) the categorical features. While I have a slight preference for representing things as big flat tables, I would be fine coding either way - but I wanted to raise the issue for discussion here before the interfaces are set in stone. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/886#issuecomment-48767401 I've gone through this in some depth, and aside from a couple of minor style nits - the logic looks good to me. Manish - have you compared output vs. scikit-learn for multiclass datasets and verified that things look at least reasonably similar? Really awesome work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r14836561 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -768,104 +973,157 @@ object DecisionTree extends Serializable with Logging { /** * Extracts left and right split aggregates. * @param binData Array[Double] of size 2*numFeatures*numSplits - * @return (leftNodeAgg, rightNodeAgg) tuple of type (Array[Double], - * Array[Double]) where each array is of size(numFeature,2*(numSplits-1)) + * @return (leftNodeAgg, rightNodeAgg) tuple of type (Array[Array[Array[Double\]\]\], + * Array[Array[Array[Double\]\]\]) where each array is of size(numFeature, + * (numBins - 1), numClasses) */ def extractLeftRightNodeAggregates( -binData: Array[Double]): (Array[Array[Double]], Array[Array[Double]]) = { +binData: Array[Double]): (Array[Array[Array[Double]]], Array[Array[Array[Double]]]) = { + + + def findAggForOrderedFeatureClassification( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +// shift for this featureIndex +val shift = numClasses * featureIndex * numBins + +var classIndex = 0 +while (classIndex < numClasses) { + // left node aggregate for the lowest split + leftNodeAgg(featureIndex)(0)(classIndex) = binData(shift + classIndex) + // right node aggregate for the highest split + rightNodeAgg(featureIndex)(numBins - 2)(classIndex) += binData(shift + (numClasses * (numBins - 1)) + classIndex) + classIndex += 1 +} + +// Iterate over all splits. +var splitIndex = 1 +while (splitIndex < numBins - 1) { + // calculating left node aggregate for a split as a sum of left node aggregate of a + // lower split and the left bin aggregate of a bin where the split is a high split + var innerClassIndex = 0 + while (innerClassIndex < numClasses) { +leftNodeAgg(featureIndex)(splitIndex)(innerClassIndex) + = binData(shift + numClasses * splitIndex + innerClassIndex) + +leftNodeAgg(featureIndex)(splitIndex - 1)(innerClassIndex) +rightNodeAgg(featureIndex)(numBins - 2 - splitIndex)(innerClassIndex) = + binData(shift + (numClasses * (numBins - 1 - splitIndex) + innerClassIndex)) + +rightNodeAgg(featureIndex)(numBins - 1 - splitIndex)(innerClassIndex) +innerClassIndex += 1 + } + splitIndex += 1 +} + } + + def findAggForUnorderedFeatureClassification( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +val rightChildShift = numClasses * numBins * numFeatures +var splitIndex = 0 +while (splitIndex < numBins - 1) { + var classIndex = 0 + while (classIndex < numClasses) { +// shift for this featureIndex +val shift = numClasses * featureIndex * numBins + splitIndex * numClasses +val leftBinValue = binData(shift + classIndex) +val rightBinValue = binData(rightChildShift + shift + classIndex) +leftNodeAgg(featureIndex)(splitIndex)(classIndex) = leftBinValue +rightNodeAgg(featureIndex)(splitIndex)(classIndex) = rightBinValue +classIndex += 1 + } + splitIndex += 1 +} + } + + def findAggForRegression( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +// shift for this featureIndex +val shift = 3 * featureIndex * numBins +// left node aggregate for the lowest split +leftNodeAgg(featureIndex)(0)(0) = binData(shift + 0) +leftNodeAgg(featureIndex)(0)(1) = binData(shift + 1) +leftNodeAgg(featureIndex)(0)(2) = binData(shift + 2) + +// right node aggregate for the highest split +rightNodeAgg(featureIndex)(numBins - 2)(0) = + binData(shift + (3 * (numBins - 1))) +rightNodeAgg(featureIndex)(numBins - 2)(1) = + binData(shift + (3 * (numBins - 1)) + 1) +rightNodeAgg(featureIndex)(numBins - 2)(2) = + binData(shift + (3 * (numBins - 1)) + 2) + +// Iterate over all splits.
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r13982852 --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala --- @@ -49,6 +49,7 @@ object DecisionTreeRunner { case class Params( input: String = null, algo: Algo = Classification, + numClassesForClassification: Int = 2, --- End diff -- Yeah, makes sense. If it doesn't complicate things too much we might consider adding an interface that doesn't have this specified and figures it out in one shot. Worth noting is that in R, an object of type "factor" (the default for categorical/label data) has this information built in. It can be a big pain at load time while the system tries to figure out the cardinality of the factor, but it leads to a nice compact representation of the data and eliminates situations like this one. I agree on doing the API separation with the ensembles PR. On Thu, Jun 19, 2014 at 10:46 AM, manishamde wrote: > In > examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala: > > > @@ -49,6 +49,7 @@ object DecisionTreeRunner { > >case class Params( > >input: String = null, > >algo: Algo = Classification, > > + numClassesForClassification: Int = 2, > > Inference from a large dataset could take a lot of time. In general, most > practitioners know in advance. If not, we can add a pre-processing step. > > Currently we have only numClassesForClassification as a classification > specific parameter. In general, I agree with you. At the same time, didn't > want to create more configuration classes for the user. Shall we leave it > as is for now and handle it with the ensembles PR where we have more > parameters (boosting iterations, num trees, feature subsetting, etc.) ? > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/886/files#r13982468>. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/886#issuecomment-46465872 I've taken a first pass at this and at a high level it looks good. The main two things I'd say are 1) I think an implicit that converts LabeledPoint to WeightedLabeledPoint could go a long way at removing some of the boilerplate introduced by this PR. 2) I'm getting a little concerned that we could modularize a little better - for example, every time we do a "strategy.algo match" - it feels like we could just as easily have a separate class for Regression algo, Decision algo, etc. For example, each separate algo could implement its own "binSeqOp" and a few other methods and the base class could tie these all together. This would be a fairly major refactoring and is maybe better suited for a later PR. I still need to look closely at the principal logical changes in DecisionTree.scala - and will try to get to this before the end of the week. Thanks for your patience! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r13926606 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -233,13 +234,73 @@ object DecisionTree extends Serializable with Logging { algo: Algo, impurity: Impurity, maxDepth: Int): DecisionTreeModel = { -val strategy = new Strategy(algo,impurity,maxDepth) -new DecisionTree(strategy).train(input: RDD[LabeledPoint]) +val strategy = new Strategy(algo, impurity, maxDepth) +// Converting from standard instance format to weighted input format for tree training +val weightedInput = input.map(x => WeightedLabeledPoint(x.label, x.features)) +new DecisionTree(strategy).train(weightedInput: RDD[WeightedLabeledPoint]) --- End diff -- Not sure why you need to be explicit about the types here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r13926555 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -212,7 +211,9 @@ object DecisionTree extends Serializable with Logging { * @return a DecisionTreeModel that can be used for prediction */ def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel = { -new DecisionTree(strategy).train(input: RDD[LabeledPoint]) +// Converting from standard instance format to weighted input format for tree training --- End diff -- Maybe this is better served with an implicit since I think we'll want to re-use labeled point elsewhere and having an automatic conversion might be nice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r13926460 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -45,7 +46,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo * @param input RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] used as training data * @return a DecisionTreeModel that can be used for prediction */ - def train(input: RDD[LabeledPoint]): DecisionTreeModel = { + def train(input: RDD[WeightedLabeledPoint]): DecisionTreeModel = { --- End diff -- If we're going to change the interface, it might be nice to have an implicit conversion between LabeledPoint and WeightedLabeledPoint (which assigns weight 1 to everything). I think the common case is going to be using unweighted anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r13926351 --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala --- @@ -49,6 +49,7 @@ object DecisionTreeRunner { case class Params( input: String = null, algo: Algo = Classification, + numClassesForClassification: Int = 2, --- End diff -- Do we want this to be a parameter and not inferred from the data? Also - I'm wondering if it makes sense to subclass params with DecisionTreeParams vs. RegressionTreeParams so that we keep logically separate options separate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/886#issuecomment-44360446 I am worried that exponential growth in the number of split possibilities kills us when we "check for all splits" when we get to even 20-30 categorical values. That's potentially a billion possible candidates to check. I have a feeling that heuristics will be more practical (but i don't have a reference!). We might add an option for "checking for all" vs. "using an entropy based heuristic" and automatically decide which to use at some conservative threshold that is user-configurable. On Tue, May 27, 2014 at 7:41 PM, manishamde wrote: > @srowen <https://github.com/srowen> It's good to know about the use-case > for cardinality in the order of tens. > > The categorical feature ordering using the average value of the target > variable works well for both binary classification and regression (section > 9.2.4 of Elements of Statistical Learning) and it's already implemented in > MLlib decision tree. > > This PR handles the scenario where the 'ordering' assumption does not hold > true for the multiclass classification. I like the suggestion of using > entropy to sort the categories -- it will be great if we could also find a > theoretical reference for it! > > Here is what I propose for handling categorical features in multiclass > classification: > 1. We check for all splits of the categorical variable if the bin > constraints are met. > 2. If the bin constraints are not met, we can use a sorting heuristic > (like entropy of the target variable) > > I think this might be the best tradeoff both from the theoretical and > practical perspective and it will save the user a lot of data munging > effort which is one of the main advantages of decision trees. > > â > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/886#issuecomment-44359841> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fixing typo in als.py
GitHub user etrain opened a pull request: https://github.com/apache/spark/pull/696 Fixing typo in als.py XtY should be Xty. You can merge this pull request into a Git repository by running: $ git pull https://github.com/etrain/spark-1 patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/696.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #696 commit 634cb8d18e7976ea0788ec3617c0e2d11c1145c7 Author: Evan Sparks Date: 2014-05-08T17:28:40Z Fixing typo in als.py XtY should be Xty. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use numpy directly for matrix multiply.
GitHub user etrain opened a pull request: https://github.com/apache/spark/pull/687 Use numpy directly for matrix multiply. Using matrix multiply to compute XtX and XtY yields a 5-20x speedup depending on problem size. For example - the following takes 19s locally after this change vs. 5m21s before the change. (16x speedup). bin/pyspark examples/src/main/python/als.py local[8] 1000 1000 50 10 10 You can merge this pull request into a Git repository by running: $ git pull https://github.com/etrain/spark-1 patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #687 commit d1ab9b6ef020bc0983a72da8a9add9cfc2356a4c Author: Evan Sparks Date: 2014-05-08T01:18:36Z Use numpy directly for matrix multiply. Using matrix multiply to compute XtX and XtY yields a 5-20x speedup depending on problem size. For example - the following takes 19s locally after this change vs. 5m21s before the change. (16x speedup). bin/pyspark examples/src/main/python/als.py local[8] 1000 1000 50 10 10 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/476#issuecomment-41753636 Also, speaking of @jegonzal maybe this is a natural first point of integration between MLlib and GraphX - I know GraphX has an implementation of LDA built in, and maybe this is a chance for us to leverage that work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/476#issuecomment-41753574 Before I get too deep into this review - I want to step back and think about whether we expect the model in this case to be on the order of the size of the data - I think it is, and if so, we may want to consider representing the model as RDD[DocumentTopicFeatures] and RDD[TopicWordFeatures], similar to what we do with ALS. This may change the algorithm substantially. Separately, maybe it makes sense to have a concrete use case to work with (reuters dataset or something) so that we can evaluate how much memory actually gets used given a reasonably sized corpus. Perhaps @mengxr or @jegonzal has a strong opinion on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126271 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala --- @@ -20,15 +20,17 @@ package org.apache.spark.mllib.util import scala.reflect.ClassTag import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance} +import breeze.util.Index +import chalk.text.tokenize.JavaWordTokenizer --- End diff -- I think chalk for tokenization is probably a fine choice for starters - but can you say what it buys us over regular scala StringTokenizer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126247 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.expectation + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, sum} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.clustering.{Document, LDAParams} +import org.apache.spark.mllib.linalg.{Vector, Vectors} + +/** + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model. + * @param data Dataset, such as corpus. + * @param numOuterIterations Number of outer iteration. + * @param numInnerIterations Number of inner iteration, used in each partition. + * @param docTopicSmoothing Document-topic smoothing. + * @param topicTermSmoothing Topic-term smoothing. + */ +class GibbsSampling( +data: RDD[Document], +numOuterIterations: Int, +numInnerIterations: Int, +docTopicSmoothing: Double, +topicTermSmoothing: Double) + extends Logging with Serializable { + + import GibbsSampling._ + + /** + * Main function of running a Gibbs sampling method. It contains two phases of total Gibbs + * sampling: first is initialization, second is real sampling. + */ + def runGibbsSampling( + initParams: LDAParams, + data: RDD[Document] = data, + numOuterIterations: Int = numOuterIterations, + numInnerIterations: Int = numInnerIterations, + docTopicSmoothing: Double = docTopicSmoothing, + topicTermSmoothing: Double = topicTermSmoothing): LDAParams = { + +val numTerms = initParams.topicTermCounts.head.size +val numDocs = initParams.docCounts.size +val numTopics = initParams.topicCounts.size + +// Construct topic assignment RDD +logInfo("Start initialization") + +val cpInterval = System.getProperty("spark.gibbsSampling.checkPointInterval", "10").toInt +val sc = data.context +val (initialParams, initialChosenTopics) = sampleTermAssignment(initParams, data) + +// Gibbs sampling +val (params, _, _) = Iterator.iterate((sc.accumulable(initialParams), initialChosenTopics, 0)) { + case (lastParams, lastChosenTopics, i) => +logInfo("Start Gibbs sampling") + +val rand = new Random(42 + i * i) +val params = sc.accumulable(LDAParams(numDocs, numTopics, numTerms)) +val chosenTopics = data.zip(lastChosenTopics).map { + case (Document(docId, content), topics) => +content.zip(topics).map { case (term, topic) => + lastParams += (docId, term, topic, -1) + + val chosenTopic = lastParams.localValue.dropOneDistSampler( +docTopicSmoothing, topicTermSmoothing, term, docId, rand) + + lastParams += (docId, term, chosenTopic, 1) + params += (docId, term, chosenTopic, 1) + + chosenTopic +} +}.cache() + +if (i + 1 % cpInterval == 0) { + chosenTopics.checkpoint() +} + +// Trigger a job to collect accumulable LDA parameters. +chosenTopics.count() +lastChosenTopics.unpersist() + +(params, chosenTopics, i + 1) +}.drop(1 + numOuterIterations).next() + +params.value + } + + /** + * Model matrix Phi and Theta are inferred via LDAParams. + */ + def solvePhiAndTheta( + params: LDAParams, + docTopicSmoothing: Double = docTopicSmoothing, + topicTermSmoothing: Double = topicTermSmoothing): (Array[Vector], Array[Vector]) = { --- End diff -- Again
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126191 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.expectation + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, sum} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.clustering.{Document, LDAParams} +import org.apache.spark.mllib.linalg.{Vector, Vectors} + +/** + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model. + * @param data Dataset, such as corpus. + * @param numOuterIterations Number of outer iteration. + * @param numInnerIterations Number of inner iteration, used in each partition. + * @param docTopicSmoothing Document-topic smoothing. + * @param topicTermSmoothing Topic-term smoothing. + */ +class GibbsSampling( +data: RDD[Document], +numOuterIterations: Int, +numInnerIterations: Int, +docTopicSmoothing: Double, +topicTermSmoothing: Double) + extends Logging with Serializable { + + import GibbsSampling._ + + /** + * Main function of running a Gibbs sampling method. It contains two phases of total Gibbs + * sampling: first is initialization, second is real sampling. + */ + def runGibbsSampling( + initParams: LDAParams, + data: RDD[Document] = data, + numOuterIterations: Int = numOuterIterations, + numInnerIterations: Int = numInnerIterations, + docTopicSmoothing: Double = docTopicSmoothing, + topicTermSmoothing: Double = topicTermSmoothing): LDAParams = { + +val numTerms = initParams.topicTermCounts.head.size +val numDocs = initParams.docCounts.size +val numTopics = initParams.topicCounts.size + +// Construct topic assignment RDD +logInfo("Start initialization") + +val cpInterval = System.getProperty("spark.gibbsSampling.checkPointInterval", "10").toInt +val sc = data.context +val (initialParams, initialChosenTopics) = sampleTermAssignment(initParams, data) + +// Gibbs sampling +val (params, _, _) = Iterator.iterate((sc.accumulable(initialParams), initialChosenTopics, 0)) { --- End diff -- Why an accumulator and not an .aggregate()? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126176 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.expectation + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, sum} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.clustering.{Document, LDAParams} +import org.apache.spark.mllib.linalg.{Vector, Vectors} + +/** + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model. + * @param data Dataset, such as corpus. + * @param numOuterIterations Number of outer iteration. + * @param numInnerIterations Number of inner iteration, used in each partition. + * @param docTopicSmoothing Document-topic smoothing. + * @param topicTermSmoothing Topic-term smoothing. + */ +class GibbsSampling( --- End diff -- Gibbs Sampling is a very useful general purpose tool to have. It's interface should be something more generic than RDD[Document], and the parameters should be amenable to domains other than text. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126094 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( +docCounts: Vector, +topicCounts: Vector, +docTopicCounts: Array[Vector], +topicTermCounts: Array[Vector]) + extends Serializable { + + def update(docId: Int, term: Int, topic: Int, inc: Int) = { +docCounts.toBreeze(docId) += inc +topicCounts.toBreeze(topic) += inc +docTopicCounts(docId).toBreeze(topic) += inc +topicTermCounts(topic).toBreeze(term) += inc +this + } + + def merge(other: LDAParams) = { +docCounts.toBreeze += other.docCounts.toBreeze +topicCounts.toBreeze += other.topicCounts.toBreeze + +var i = 0 +while (i < docTopicCounts.length) { + docTopicCounts(i).toBreeze += other.docTopicCounts(i).toBreeze --- End diff -- more breeze conversion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126106 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( +docCounts: Vector, +topicCounts: Vector, +docTopicCounts: Array[Vector], +topicTermCounts: Array[Vector]) + extends Serializable { + + def update(docId: Int, term: Int, topic: Int, inc: Int) = { +docCounts.toBreeze(docId) += inc +topicCounts.toBreeze(topic) += inc +docTopicCounts(docId).toBreeze(topic) += inc +topicTermCounts(topic).toBreeze(term) += inc +this + } + + def merge(other: LDAParams) = { +docCounts.toBreeze += other.docCounts.toBreeze +topicCounts.toBreeze += other.topicCounts.toBreeze + +var i = 0 +while (i < docTopicCounts.length) { + docTopicCounts(i).toBreeze += other.docTopicCounts(i).toBreeze + i += 1 +} + +i = 0 +while (i < topicTermCounts.length) { + topicTermCounts(i).toBreeze += other.topicTermCounts(i).toBreeze + i += 1 +} +this + } + + /** + * This function used for computing the new distribution after drop one from current document, + * which is a really essential part of Gibbs sampling for LDA, you can refer to the paper: + * Parameter estimation for text analysis --- End diff -- Link to paper, please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126093 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( +docCounts: Vector, +topicCounts: Vector, +docTopicCounts: Array[Vector], +topicTermCounts: Array[Vector]) + extends Serializable { + + def update(docId: Int, term: Int, topic: Int, inc: Int) = { +docCounts.toBreeze(docId) += inc --- End diff -- Doing the breeze conversion on every update seems inefficient. These variables should be private and created as breeze variables at initialization, only the user facing APIs need to be Vector, Array[Vector], etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126088 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( +docCounts: Vector, +topicCounts: Vector, +docTopicCounts: Array[Vector], +topicTermCounts: Array[Vector]) + extends Serializable { + + def update(docId: Int, term: Int, topic: Int, inc: Int) = { +docCounts.toBreeze(docId) += inc +topicCounts.toBreeze(topic) += inc +docTopicCounts(docId).toBreeze(topic) += inc --- End diff -- Again, I think in this case the *model* might be really big - e.g. a billion documents in hundreds of topics. Or for the term side, millions of words in a vocabulary and hundreds of topics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126007 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( --- End diff -- Also - I don't think it should be a case class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12126002 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( --- End diff -- This should be called just "LDA" since it's the class that fits the model. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12125991 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) --- End diff -- Hmm... if documents are just an ID and a list of token IDs, maybe something like a SparseVector is a better representation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/476#discussion_r12125916 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.{AccumulableParam, Logging, SparkContext} +import org.apache.spark.mllib.expectation.GibbsSampling +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +case class Document(docId: Int, content: Iterable[Int]) + +case class LDAParams ( +docCounts: Vector, +topicCounts: Vector, +docTopicCounts: Array[Vector], +topicTermCounts: Array[Vector]) --- End diff -- I expect that this will be *really* big - maybe the last two variables should be RDDs - similar to what we do with ALS. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1544 Add support for deep decision trees...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/475#issuecomment-41200739 Can one of the admins take a look at this? The Travis CI error seems to be in StreamingContext tests, which have nothing to do with this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/458#issuecomment-41079198 Right - the pattern is virtually identical except for an update function call. Can we abstract this away so that we can deliver the first 3 algorithms of ADMM with a few lines of code so that it's straightforward to add new versions of ADMM algorithms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/458#issuecomment-40994263 Hey, this looks awesome! One high-level issue I see is that the ADMM optimizer has embedded in it knowledge of the loss function it's trying to minimize. ADMM is much more general than that and is nicely scalable - can we abstract out the general ADMM computation pattern out in a spirit similar to what we've done with GradientDescent - and have Lasso, SVM, etc. done with ADMM as subclasses that implement a specialized "compute" function (or something)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: MLI-1 Decision Trees
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/79#issuecomment-39392123 Hi Hirakendu - thanks for all the detailed suggestions and information. I will reply to that separately. One question - you say there are 500,000 examples and this equates to 90GB of raw data. If that's the case, this works out to ~200KB per example - is that right or are you off by an order of magnitude in either the number of features or the number of data points? Or are we throwing a bunch of data out before fitting? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: MLI-1 Decision Trees
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/79#discussion_r10934747 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.tree.impurity + +/** + * Trail for calculating information gain + */ +trait Impurity extends Serializable { + + /** + * information calculation for binary classification + * @param c0 count of instances with label 0 + * @param c1 count of instances with label 1 + * @return information value + */ + def calculate(c0 : Double, c1 : Double): Double + + /** + * information calculation for regression + * @param count number of instances + * @param sum sum of labels + * @param sumSquares summation of squares of the labels + * @return information value + */ + def calculate(count: Double, sum: Double, sumSquares: Double): Double --- End diff -- I agree that overflow is an issue here (particularly in the case of sumSquares), but also agree with Manish/Hirakendu that this algorithm maintains its ability to generate a tree in a reasonable amount of time based on this property that we compute statistics for splits and then merge them together. I actually do think it makes sense to maintain "(count, average, averageSumSq)" for each partition in a way that's overflow friendly and compute the combination as count-weighted average of both as Hirakendu suggests. This will complicate the code but should solve the overflow problem and keep things pretty efficient. That said - maybe this could be taken care of in a future PR as a bugfix, rather than in this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---